summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse.hs')
-rw-r--r--src/Network/QueryResponse.hs638
1 files changed, 0 insertions, 638 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs
deleted file mode 100644
index c4ff50e3..00000000
--- a/src/Network/QueryResponse.hs
+++ /dev/null
@@ -1,638 +0,0 @@
1-- | This module can implement any query\/response protocol. It was written
2-- with Kademlia implementations in mind.
3
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10{-# LANGUAGE TupleSections #-}
11module Network.QueryResponse where
12
13#ifdef THREAD_DEBUG
14import Control.Concurrent.Lifted.Instrument
15#else
16import Control.Concurrent
17import GHC.Conc (labelThread)
18#endif
19import Control.Concurrent.STM
20import Control.Exception
21import Control.Monad
22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString)
24import Data.Function
25import Data.Functor.Contravariant
26import qualified Data.IntMap.Strict as IntMap
27 ;import Data.IntMap.Strict (IntMap)
28import qualified Data.Map.Strict as Map
29 ;import Data.Map.Strict (Map)
30import Data.Time.Clock.POSIX
31import qualified Data.Word64Map as W64Map
32 ;import Data.Word64Map (Word64Map)
33import Data.Word
34import Data.Maybe
35import GHC.Event
36import Network.Socket
37import Network.Socket.ByteString as B
38import System.Endian
39import System.IO
40import System.IO.Error
41import System.Timeout
42import DPut
43import DebugTag
44import Data.TableMethods
45
46-- | Three methods are required to implement a datagram based query\/response protocol.
47data TransportA err addr x y = Transport
48 { -- | Blocks until an inbound packet is available. Returns 'Nothing' when
49 -- no more packets are expected due to a shutdown or close event.
50 -- Otherwise, the packet will be parsed as type /x/ and an origin address
51 -- /addr/. Parse failure is indicated by the type 'err'.
52 awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a
53 -- | Send an /y/ packet to the given destination /addr/.
54 , sendMessage :: addr -> y -> IO ()
55 -- | Shutdown and clean up any state related to this 'Transport'.
56 , closeTransport :: IO ()
57 }
58
59type Transport err addr x = TransportA err addr x x
60
61-- | This function modifies a 'Transport' to use higher-level addresses and
62-- packet representations. It could be used to change UDP 'ByteString's into
63-- bencoded syntax trees or to add an encryption layer in which addresses have
64-- associated public keys.
65layerTransportM ::
66 (x -> addr -> IO (Either err (x', addr')))
67 -- ^ Function that attempts to transform a low-level address/packet
68 -- pair into a higher level representation.
69 -> (y' -> addr' -> IO (y, addr))
70 -- ^ Function to encode a high-level address/packet into a lower level
71 -- representation.
72 -> TransportA err addr x y
73 -- ^ The low-level transport to be transformed.
74 -> TransportA err addr' x' y'
75layerTransportM parse encode tr =
76 tr { awaitMessage = \kont ->
77 awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join
78 , sendMessage = \addr' msg' -> do
79 (msg,addr) <- encode msg' addr'
80 sendMessage tr addr msg
81 }
82
83
84-- | This function modifies a 'Transport' to use higher-level addresses and
85-- packet representations. It could be used to change UDP 'ByteString's into
86-- bencoded syntax trees or to add an encryption layer in which addresses have
87-- associated public keys.
88layerTransport ::
89 (x -> addr -> Either err (x', addr'))
90 -- ^ Function that attempts to transform a low-level address/packet
91 -- pair into a higher level representation.
92 -> (y' -> addr' -> (y, addr))
93 -- ^ Function to encode a high-level address/packet into a lower level
94 -- representation.
95 -> TransportA err addr x y
96 -- ^ The low-level transport to be transformed.
97 -> TransportA err addr' x' y'
98layerTransport parse encode tr =
99 layerTransportM (\x addr -> return $ parse x addr)
100 (\x' addr' -> return $ encode x' addr')
101 tr
102
103-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar'
104-- is used to share the same underlying socket, so be sure to fork a thread for
105-- both returned 'Transport's to avoid hanging.
106partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
107 -> ((x,xaddr) -> Maybe (b,a))
108 -> Transport err a b
109 -> IO (Transport err xaddr x, Transport err a b)
110partitionTransport parse encodex tr =
111 partitionTransportM (return . parse) (return . encodex) tr
112
113-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar'
114-- is used to share the same underlying socket, so be sure to fork a thread for
115-- both returned 'Transport's to avoid hanging.
116partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
117 -> ((x,xaddr) -> IO (Maybe (b,a)))
118 -> Transport err a b
119 -> IO (Transport err xaddr x, Transport err a b)
120partitionTransportM parse encodex tr = do
121 mvar <- newEmptyMVar
122 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
123 awaitMessage tr $ \m -> case m of
124 Just (Right msg) -> parse msg >>=
125 either (kont . Just . Right)
126 (\y -> putMVar mvar y >> again)
127 Just (Left e) -> kont $ Just (Left e)
128 Nothing -> kont Nothing
129 , sendMessage = \addr' msg' -> do
130 msg_addr <- encodex (msg',addr')
131 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
132 }
133 ytr = Transport
134 { awaitMessage = \kont -> takeMVar mvar >>= kont . Just . Right
135 , sendMessage = sendMessage tr
136 , closeTransport = return ()
137 }
138 return (xtr, ytr)
139
140partitionAndForkTransport ::
141 (dst -> msg -> IO ())
142 -> ((b,a) -> IO (Either (x,xaddr) (b,a)))
143 -> ((x,xaddr) -> IO (Maybe (Either (msg,dst) (b,a))))
144 -> Transport err a b
145 -> IO (Transport err xaddr x, Transport err a b)
146partitionAndForkTransport forkedSend parse encodex tr = do
147 mvar <- newEmptyMVar
148 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
149 awaitMessage tr $ \m -> case m of
150 Just (Right msg) -> parse msg >>=
151 either (kont . Just . Right)
152 (\y -> putMVar mvar y >> again)
153 Just (Left e) -> kont $ Just (Left e)
154 Nothing -> kont Nothing
155 , sendMessage = \addr' msg' -> do
156 msg_addr <- encodex (msg',addr')
157 case msg_addr of
158 Just (Right (b,a)) -> sendMessage tr a b
159 Just (Left (msg,dst)) -> forkedSend dst msg
160 Nothing -> return ()
161 }
162 ytr = Transport
163 { awaitMessage = \kont -> takeMVar mvar >>= kont . Just . Right
164 , sendMessage = sendMessage tr
165 , closeTransport = return ()
166 }
167 return (xtr, ytr)
168
169-- |
170-- * f add x --> Nothing, consume x
171-- --> Just id, leave x to a different handler
172-- --> Just g, apply g to x and leave that to a different handler
173addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x
174addHandler onParseError f tr = tr
175 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do
176 case m of
177 Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x))
178 Just (Left e ) -> onParseError e >> kont (Just $ Left e)
179 Nothing -> kont $ Nothing
180 }
181
182-- | Modify a 'Transport' to invoke an action upon every received packet.
183onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
184onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr
185
186-- * Using a query\/response client.
187
188-- | Fork a thread that handles inbound packets. The returned action may be used
189-- to terminate the thread and clean up any related state.
190--
191-- Example usage:
192--
193-- > -- Start client.
194-- > quitServer <- forkListener "listener" (clientNet client)
195-- > -- Send a query q, recieve a response r.
196-- > r <- sendQuery client method q
197-- > -- Quit client.
198-- > quitServer
199forkListener :: String -> Transport err addr x -> IO (IO ())
200forkListener name client = do
201 thread_id <- forkIO $ do
202 myThreadId >>= flip labelThread ("listener."++name)
203 fix $ awaitMessage client . const
204 dput XMisc $ "Listener died: " ++ name
205 return $ do
206 closeTransport client
207 killThread thread_id
208
209asyncQuery_ :: Client err meth tid addr x
210 -> MethodSerializer tid addr x meth a b
211 -> a
212 -> addr
213 -> (Maybe b -> IO ())
214 -> IO (tid,POSIXTime,Int)
215asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do
216 now <- getPOSIXTime
217 (tid,addr,expiry) <- atomically $ do
218 tbl <- readTVar pending
219 ((tid,addr,expiry), tbl') <- dispatchRegister (tableMethods d)
220 (methodTimeout meth)
221 now
222 (withResponse . fmap (unwrapResponse meth))
223 addr0
224 tbl
225 -- (addr,expiry) <- methodTimeout meth tid addr0
226 writeTVar pending tbl'
227 return (tid,addr,expiry)
228 self <- whoami (Just addr)
229 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
230 return $ Just ()
231 `catchIOError` (\e -> return Nothing)
232 return (tid,now,expiry)
233
234asyncQuery :: Show meth => Client err meth tid addr x
235 -> MethodSerializer tid addr x meth a b
236 -> a
237 -> addr
238 -> (Maybe b -> IO ())
239 -> IO ()
240asyncQuery client meth q addr withResponse0 = do
241 tm <- getSystemTimerManager
242 tidvar <- newEmptyMVar
243 timedout <- registerTimeout tm 1000000 $ do
244 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
245 withResponse0 Nothing
246 tid <- takeMVar tidvar
247 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
248 case client of
249 Client { clientDispatcher = d, clientPending = pending } -> do
250 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
251 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
252 unregisterTimeout tm timedout
253 withResponse0 x
254 putMVar tidvar tid
255 updateTimeout tm timedout expiry
256 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
257
258-- | Send a query to a remote peer. Note that this function will always time
259-- out if 'forkListener' was never invoked to spawn a thread to receive and
260-- dispatch the response.
261sendQuery ::
262 forall err a b tbl x meth tid addr.
263 Client err meth tid addr x -- ^ A query/response implementation.
264 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
265 -> a -- ^ The outbound query.
266 -> addr -- ^ Destination address of query.
267 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
268sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
269 mvar <- newEmptyMVar
270 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar)
271 mres <- timeout expiry $ takeMVar mvar
272 case mres of
273 Just b -> return $ Just b
274 Nothing -> do
275 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
276 return Nothing
277
278-- * Implementing a query\/response 'Client'.
279
280-- | All inputs required to implement a query\/response client.
281data Client err meth tid addr x = forall tbl. Client
282 { -- | The 'Transport' used to dispatch and receive packets.
283 clientNet :: Transport err addr x
284 -- | Methods for handling inbound packets.
285 , clientDispatcher :: DispatchMethods tbl err meth tid addr x
286 -- | Methods for reporting various conditions.
287 , clientErrorReporter :: ErrorReporter addr x meth tid err
288 -- | State necessary for routing inbound responses and assigning unique
289 -- /tid/ values for outgoing queries.
290 , clientPending :: TVar tbl
291 -- | An action yielding this client\'s own address. It is invoked once
292 -- on each outbound and inbound packet. It is valid for this to always
293 -- return the same value.
294 --
295 -- The argument, if supplied, is the remote address for the transaction.
296 -- This can be used to maintain consistent aliases for specific peers.
297 , clientAddress :: Maybe addr -> IO addr
298 -- | Transform a query /tid/ value to an appropriate response /tid/
299 -- value. Normally, this would be the identity transformation, but if
300 -- /tid/ includes a unique cryptographic nonce, then it should be
301 -- generated here.
302 , clientResponseId :: tid -> IO tid
303 }
304
305-- | An incoming message can be classified into three cases.
306data MessageClass err meth tid addr x
307 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
308 -- should include the provided /tid/ value.
309 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
310 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
311 -- with the source and destination address of a message. If it handles the
312 -- message, it should return Nothing. Otherwise, it should return a transform
313 -- (usually /id/) to apply before the next handler examines it.
314 | IsUnknown err -- ^ None of the above.
315
316-- | Handler for an inbound query of type /x/ from an address of type _addr_.
317data MethodHandler err tid addr x = forall a b. MethodHandler
318 { -- | Parse the query into a more specific type for this method.
319 methodParse :: x -> Either err a
320 -- | Serialize the response for transmission, given a context /ctx/ and the origin
321 -- and destination addresses.
322 , methodSerialize :: tid -> addr -> addr -> b -> x
323 -- | Fully typed action to perform upon the query. The remote origin
324 -- address of the query is provided to the handler.
325 , methodAction :: addr -> a -> IO b
326 }
327 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
328 | forall a. NoReply
329 { -- | Parse the query into a more specific type for this method.
330 methodParse :: x -> Either err a
331 -- | Fully typed action to perform upon the query. The remote origin
332 -- address of the query is provided to the handler.
333 , noreplyAction :: addr -> a -> IO ()
334 }
335
336contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
337contramapAddr f (MethodHandler p s a)
338 = MethodHandler
339 p
340 (\tid src dst result -> s tid (f src) (f dst) result)
341 (\addr arg -> a (f addr) arg)
342contramapAddr f (NoReply p a)
343 = NoReply p (\addr arg -> a (f addr) arg)
344
345
346-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
347-- parse is successful, the returned IO action will construct our reply if
348-- there is one. Otherwise, a parse err is returned.
349dispatchQuery :: MethodHandler err tid addr x -- ^ Handler to invoke.
350 -> tid -- ^ The transaction id for this query\/response session.
351 -> addr -- ^ Our own address, to which the query was sent.
352 -> x -- ^ The query packet.
353 -> addr -- ^ The origin address of the query.
354 -> Either err (IO (Maybe x))
355dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr =
356 fmap (\a -> Just . wrapR tid self addr <$> f addr a) $ unwrapQ x
357dispatchQuery (NoReply unwrapQ f) tid self x addr =
358 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x
359
360-- | These four parameters are required to implement an outgoing query. A
361-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
362-- might be returned by 'lookupHandler'.
363data MethodSerializer tid addr x meth a b = MethodSerializer
364 { -- | Returns the microseconds to wait for a response to this query being
365 -- sent to the given address. The /addr/ may also be modified to add
366 -- routing information.
367 methodTimeout :: tid -> addr -> STM (addr,Int)
368 -- | A method identifier used for error reporting. This needn't be the
369 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
370 , method :: meth
371 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
372 -- The /addr/ arguments are, respectively, our own origin address and the
373 -- destination of the request. The /tid/ argument is useful for attaching
374 -- auxiliary notations on all outgoing packets.
375 , wrapQuery :: tid -> addr -> addr -> a -> x
376 -- | Parse an inbound packet /x/ into a response /b/ for this query.
377 , unwrapResponse :: x -> b
378 }
379
380
381-- | To dispatch responses to our outbound queries, we require three
382-- primitives. See the 'transactionMethods' function to create these
383-- primitives out of a lookup table and a generator for transaction ids.
384--
385-- The type variable /d/ is used to represent the current state of the
386-- transaction generator and the table of pending transactions.
387data TransactionMethods d tid addr x = TransactionMethods
388 {
389 -- | Before a query is sent, this function stores an 'MVar' to which the
390 -- response will be written too. The returned /tid/ is a transaction id
391 -- that can be used to forget the 'MVar' if the remote peer is not
392 -- responding.
393 dispatchRegister :: (tid -> addr -> STM (addr,Int)) -> POSIXTime -> (Maybe x -> IO ()) -> addr -> d -> STM ((tid,addr,Int), d)
394 -- | This method is invoked when an incoming packet /x/ indicates it is
395 -- a response to the transaction with id /tid/. The returned IO action
396 -- will write the packet to the correct 'MVar' thus completing the
397 -- dispatch.
398 , dispatchResponse :: tid -> x -> d -> STM (d, IO ())
399 -- | When a timeout interval elapses, this method is called to remove the
400 -- transaction from the table.
401 , dispatchCancel :: tid -> d -> STM d
402 }
403
404-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
405-- function for generating unique transaction ids.
406transactionMethods ::
407 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
408 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
409 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
410transactionMethods methods generate = transactionMethods' id id methods generate
411
412microsecondsDiff :: Int -> POSIXTime
413microsecondsDiff us = fromIntegral us / 1000000
414
415-- | Like 'transactionMethods' but allows extra information to be stored in the
416-- table of pending transactions. This also enables multiple 'Client's to
417-- share a single transaction table.
418transactionMethods' ::
419 ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry
420 -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry
421 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/.
422 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
423 -> TransactionMethods (g,t a) tid addr x
424transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
425 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
426 , dispatchRegister = \getTimeout now v a0 (g,t) -> do
427 let (tid,g') = generate g
428 (a,expiry) <- getTimeout tid a0
429 let t' = insert tid (store v) (now + microsecondsDiff expiry) t
430 return ( (tid,a,expiry), (g',t') )
431 , dispatchResponse = \tid x (g,t) ->
432 case lookup tid t of
433 Just v -> let t' = delete tid t
434 in return ((g,t'),void $ load v $ Just x)
435 Nothing -> return ((g,t), return ())
436 }
437
438-- | A set of methods necessary for dispatching incoming packets.
439data DispatchMethods tbl err meth tid addr x = DispatchMethods
440 { -- | Classify an inbound packet as a query or response.
441 classifyInbound :: x -> MessageClass err meth tid addr x
442 -- | Lookup the handler for a inbound query.
443 , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x)
444 -- | Methods for handling incoming responses.
445 , tableMethods :: TransactionMethods tbl tid addr x
446 }
447
448-- | These methods indicate what should be done upon various conditions. Write
449-- to a log file, make debug prints, or simply ignore them.
450--
451-- [ /addr/ ] Address of remote peer.
452--
453-- [ /x/ ] Incoming or outgoing packet.
454--
455-- [ /meth/ ] Method id of incoming or outgoing request.
456--
457-- [ /tid/ ] Transaction id for outgoing packet.
458--
459-- [ /err/ ] Error information, typically a 'String'.
460data ErrorReporter addr x meth tid err = ErrorReporter
461 { -- | Incoming: failed to parse packet.
462 reportParseError :: err -> IO ()
463 -- | Incoming: no handler for request.
464 , reportMissingHandler :: meth -> addr -> x -> IO ()
465 -- | Incoming: unable to identify request.
466 , reportUnknown :: addr -> x -> err -> IO ()
467 }
468
469ignoreErrors :: ErrorReporter addr x meth tid err
470ignoreErrors = ErrorReporter
471 { reportParseError = \_ -> return ()
472 , reportMissingHandler = \_ _ _ -> return ()
473 , reportUnknown = \_ _ _ -> return ()
474 }
475
476logErrors :: ( Show addr
477 , Show meth
478 ) => ErrorReporter addr x meth tid String
479logErrors = ErrorReporter
480 { reportParseError = \err -> dput XMisc err
481 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
482 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
483 }
484
485printErrors :: ( Show addr
486 , Show meth
487 ) => Handle -> ErrorReporter addr x meth tid String
488printErrors h = ErrorReporter
489 { reportParseError = \err -> hPutStrLn h err
490 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
491 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
492 }
493
494-- Change the /err/ type for an 'ErrorReporter'.
495instance Contravariant (ErrorReporter addr x meth tid) where
496 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
497 contramap f (ErrorReporter pe mh unk)
498 = ErrorReporter (\e -> pe (f e))
499 mh
500 (\addr x e -> unk addr x (f e))
501
502-- | Handle a single inbound packet and then invoke the given continuation.
503-- The 'forkListener' function is implemented by passing this function to 'fix'
504-- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or
505-- throws an exception.
506handleMessage ::
507 Client err meth tid addr x
508 -> addr
509 -> x
510 -> IO (Maybe (x -> x))
511handleMessage (Client net d err pending whoami responseID) addr plain = do
512 -- Just (Left e) -> do reportParseError err e
513 -- return $! Just id
514 -- Just (Right (plain, addr)) -> do
515 case classifyInbound d plain of
516 IsQuery meth tid -> case lookupHandler d meth of
517 Nothing -> do reportMissingHandler err meth addr plain
518 return $! Just id
519 Just m -> do
520 self <- whoami (Just addr)
521 tid' <- responseID tid
522 either (\e -> do reportParseError err e
523 return $! Just id)
524 (>>= \m -> do mapM_ (sendMessage net addr) m
525 return $! Nothing)
526 (dispatchQuery m tid' self plain addr)
527 IsUnsolicited action -> do
528 self <- whoami (Just addr)
529 action self addr
530 return Nothing
531 IsResponse tid -> do
532 action <- atomically $ do
533 ts0 <- readTVar pending
534 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0
535 writeTVar pending ts
536 return action
537 action
538 return $! Nothing
539 IsUnknown e -> do reportUnknown err addr plain e
540 return $! Just id
541 -- Nothing -> return $! id
542
543-- * UDP Datagrams.
544
545-- | Access the address family of a given 'SockAddr'. This convenient accessor
546-- is missing from 'Network.Socket', so I implemented it here.
547sockAddrFamily :: SockAddr -> Family
548sockAddrFamily (SockAddrInet _ _ ) = AF_INET
549sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
550sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
551sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated
552
553-- | Packets with an empty payload may trigger EOF exception.
554-- 'udpTransport' uses this function to avoid throwing in that
555-- case.
556ignoreEOF :: a -> IOError -> IO a
557ignoreEOF def e | isEOFError e = pure def
558 | otherwise = throwIO e
559
560-- | Hard-coded maximum packet size for incoming UDP Packets received via
561-- 'udpTransport'.
562udpBufferSize :: Int
563udpBufferSize = 65536
564
565-- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError.
566saferSendTo :: Socket -> ByteString -> SockAddr -> IO ()
567saferSendTo sock bs saddr = void (B.sendTo sock bs saddr)
568 `catch` \e ->
569 -- sendTo: does not exist (Network is unreachable)
570 -- Occurs when IPv6 or IPv4 network is not available.
571 -- Currently, we require -threaded to prevent a forever-hang in this case.
572 if isDoesNotExistError e
573 then return ()
574 else throw e
575
576-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
577-- argument is the listen-address for incoming packets. This is a useful
578-- low-level 'Transport' that can be transformed for higher-level protocols
579-- using 'layerTransport'.
580udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString)
581udpTransport bind_address = fst <$> udpTransport' bind_address
582
583-- | Like 'udpTransport' except also returns the raw socket (for broadcast use).
584udpTransport' :: SockAddr -> IO (Transport err SockAddr ByteString, Socket)
585udpTransport' bind_address = do
586 let family = sockAddrFamily bind_address
587 sock <- socket family Datagram defaultProtocol
588 when (family == AF_INET6) $ do
589 setSocketOption sock IPv6Only 0
590 setSocketOption sock Broadcast 1
591 bind sock bind_address
592 let tr = Transport {
593 awaitMessage = \kont -> do
594 r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do
595 Just . Right <$!> B.recvFrom sock udpBufferSize
596 kont $! r
597 , sendMessage = case family of
598 AF_INET6 -> \case
599 (SockAddrInet port addr) -> \bs ->
600 -- Change IPv4 to 4mapped6 address.
601 saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0
602 addr6 -> \bs -> saferSendTo sock bs addr6
603 AF_INET -> \case
604 (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do
605 let host4 = toBE32 raw4
606 -- Change 4mapped6 to ordinary IPv4.
607 -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4)
608 saferSendTo sock bs (SockAddrInet port host4)
609 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
610 addr4 -> \bs -> saferSendTo sock bs addr4
611 _ -> \addr bs -> saferSendTo sock bs addr
612 , closeTransport = close sock
613 }
614 return (tr, sock)
615
616chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
617chanTransport chanFromAddr self achan aclosed = Transport
618 { awaitMessage = \kont -> do
619 x <- atomically $ (Just <$> readTChan achan)
620 `orElse`
621 (readTVar aclosed >>= check >> return Nothing)
622 kont $ Right <$> x
623 , sendMessage = \them bs -> do
624 atomically $ writeTChan (chanFromAddr them) (bs,self)
625 , closeTransport = atomically $ writeTVar aclosed True
626 }
627
628-- | Returns a pair of transports linked together to simulate two computers talking to each other.
629testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString)
630testPairTransport = do
631 achan <- atomically newTChan
632 bchan <- atomically newTChan
633 aclosed <- atomically $ newTVar False
634 bclosed <- atomically $ newTVar False
635 let a = SockAddrInet 1 1
636 b = SockAddrInet 2 2
637 return ( chanTransport (const bchan) a achan aclosed
638 , chanTransport (const achan) b bchan bclosed )