summaryrefslogtreecommitdiff
path: root/server/src/Network/QueryResponse.hs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/Network/QueryResponse.hs')
-rw-r--r--server/src/Network/QueryResponse.hs716
1 files changed, 716 insertions, 0 deletions
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs
new file mode 100644
index 00000000..20e7ecf0
--- /dev/null
+++ b/server/src/Network/QueryResponse.hs
@@ -0,0 +1,716 @@
1-- | This module can implement any query\/response protocol. It was written
2-- with Kademlia implementations in mind.
3
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10{-# LANGUAGE TupleSections #-}
11module Network.QueryResponse where
12
13#ifdef THREAD_DEBUG
14import Control.Concurrent.Lifted.Instrument
15#else
16import Control.Concurrent
17import GHC.Conc (labelThread)
18#endif
19import Control.Concurrent.STM
20import Control.Exception
21import Control.Monad
22import qualified Data.ByteString as B
23 ;import Data.ByteString (ByteString)
24import Data.Dependent.Map as DMap
25import Data.Dependent.Sum
26import Data.Function
27import Data.Functor.Contravariant
28import Data.Functor.Identity
29import Data.GADT.Show
30import qualified Data.IntMap.Strict as IntMap
31 ;import Data.IntMap.Strict (IntMap)
32import qualified Data.Map.Strict as Map
33 ;import Data.Map.Strict (Map)
34import Data.Time.Clock.POSIX
35import qualified Data.Word64Map as W64Map
36 ;import Data.Word64Map (Word64Map)
37import Data.Word
38import Data.Maybe
39import GHC.Conc (closeFdWith)
40import GHC.Event
41import Network.Socket
42import Network.Socket.ByteString as B
43import System.Endian
44import System.IO
45import System.IO.Error
46import System.Timeout
47
48import DPut
49import DebugTag
50import Data.TableMethods
51
52-- | An inbound packet or condition raised while monitoring a connection.
53data Arrival err addr x
54 = Terminated -- ^ Virtual message that signals EOF.
55 | ParseError !err -- ^ A badly-formed message was received.
56 | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message.
57
58-- | Three methods are required to implement a datagram based query\/response protocol.
59data TransportA err addr x y = Transport
60 { -- | Blocks until an inbound packet is available. Then calls the provided
61 -- continuation with the packet and origin addres or an error condition.
62 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a)
63 -- | Send an /y/ packet to the given destination /addr/.
64 , sendMessage :: addr -> y -> IO ()
65 -- | Shutdown and clean up any state related to this 'Transport'.
66 , setActive :: Bool -> IO ()
67 }
68
69type Transport err addr x = TransportA err addr x x
70
71closeTransport :: TransportA err addr x y -> IO ()
72closeTransport tr = setActive tr False
73
74-- | This function modifies a 'Transport' to use higher-level addresses and
75-- packet representations. It could be used to change UDP 'ByteString's into
76-- bencoded syntax trees or to add an encryption layer in which addresses have
77-- associated public keys.
78layerTransportM ::
79 (x -> addr -> IO (Either err (x', addr')))
80 -- ^ Function that attempts to transform a low-level address/packet
81 -- pair into a higher level representation.
82 -> (y' -> addr' -> IO (y, addr))
83 -- ^ Function to encode a high-level address/packet into a lower level
84 -- representation.
85 -> TransportA err addr x y
86 -- ^ The low-level transport to be transformed.
87 -> TransportA err addr' x' y'
88layerTransportM parse encode tr =
89 tr { awaitMessage = \kont ->
90 awaitMessage tr $ \case
91 Terminated -> kont $ Terminated
92 ParseError e -> kont $ ParseError e
93 Arrival addr x -> parse x addr >>= \case
94 Left e -> kont $ ParseError e
95 Right (x',addr') -> kont $ Arrival addr' x'
96 , sendMessage = \addr' msg' -> do
97 (msg,addr) <- encode msg' addr'
98 sendMessage tr addr msg
99 }
100
101
102-- | This function modifies a 'Transport' to use higher-level addresses and
103-- packet representations. It could be used to change UDP 'ByteString's into
104-- bencoded syntax trees or to add an encryption layer in which addresses have
105-- associated public keys.
106layerTransport ::
107 (x -> addr -> Either err (x', addr'))
108 -- ^ Function that attempts to transform a low-level address/packet
109 -- pair into a higher level representation.
110 -> (y' -> addr' -> (y, addr))
111 -- ^ Function to encode a high-level address/packet into a lower level
112 -- representation.
113 -> TransportA err addr x y
114 -- ^ The low-level transport to be transformed.
115 -> TransportA err addr' x' y'
116layerTransport parse encode tr =
117 layerTransportM (\x addr -> return $ parse x addr)
118 (\x' addr' -> return $ encode x' addr')
119 tr
120
121-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
122-- is used to share the same underlying socket, so be sure to fork a thread for
123-- both returned 'Transport's to avoid hanging.
124partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
125 -> ((y,xaddr) -> IO (Maybe (c,a)))
126 -> TransportA err a b c
127 -> IO (TransportA err xaddr x y, TransportA err a b c)
128partitionTransportM parse encodex tr = do
129 tchan <- atomically newTChan
130 let ytr = tr { awaitMessage = \kont -> fix $ \again -> do
131 awaitMessage tr $ \m -> case m of
132 Arrival adr msg -> parse (msg,adr) >>= \case
133 Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again)
134 Right (y,yaddr) -> kont $ Arrival yaddr y
135 ParseError e -> kont $ ParseError e
136 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
137 , sendMessage = sendMessage tr
138 }
139 xtr = Transport
140 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
141 Nothing -> Terminated
142 Just (x,xaddr) -> Arrival xaddr x
143 , sendMessage = \addr' msg' -> do
144 msg_addr <- encodex (msg',addr')
145 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
146 , setActive = const $ return ()
147 }
148 return (xtr, ytr)
149
150-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan'
151-- is used to share the same underlying socket, so be sure to fork a thread for
152-- both returned 'Transport's to avoid hanging.
153partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
154 -> ((y,xaddr) -> Maybe (c,a))
155 -> TransportA err a b c
156 -> IO (TransportA err xaddr x y, TransportA err a b c)
157partitionTransport parse encodex tr =
158 partitionTransportM (return . parse) (return . encodex) tr
159
160-- |
161-- * f add x --> Nothing, consume x
162-- --> Just id, leave x to a different handler
163-- --> Just g, apply g to x and leave that to a different handler
164--
165-- Note: If you add a handler to one of the branches before applying a
166-- 'mergeTransports' combinator, then this handler may not block or return
167-- Nothing.
168addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y
169addHandler onParseError f tr = tr
170 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
171 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x))
172 ParseError e -> onParseError e >> kont (ParseError e)
173 Terminated -> kont Terminated
174 }
175
176-- | Modify a 'Transport' to invoke an action upon every received packet.
177onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
178onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr
179
180-- * Using a query\/response client.
181
182-- | Fork a thread that handles inbound packets. The returned action may be used
183-- to terminate the thread and clean up any related state.
184--
185-- Example usage:
186--
187-- > -- Start client.
188-- > quitServer <- forkListener "listener" (clientNet client)
189-- > -- Send a query q, recieve a response r.
190-- > r <- sendQuery client method q
191-- > -- Quit client.
192-- > quitServer
193forkListener :: String -> Transport err addr x -> IO (IO ())
194forkListener name client = do
195 setActive client True
196 thread_id <- forkIO $ do
197 myThreadId >>= flip labelThread ("listener."++name)
198 fix $ \loop -> join $ atomically $ awaitMessage client $ \case
199 Terminated -> return ()
200 _ -> loop
201 dput XMisc $ "Listener died: " ++ name
202 return $ do
203 setActive client False
204 -- killThread thread_id
205
206-- * Implementing a query\/response 'Client'.
207
208-- | These methods indicate what should be done upon various conditions. Write
209-- to a log file, make debug prints, or simply ignore them.
210--
211-- [ /addr/ ] Address of remote peer.
212--
213-- [ /x/ ] Incoming or outgoing packet.
214--
215-- [ /meth/ ] Method id of incoming or outgoing request.
216--
217-- [ /tid/ ] Transaction id for outgoing packet.
218--
219-- [ /err/ ] Error information, typically a 'String'.
220data ErrorReporter addr x meth tid err = ErrorReporter
221 { -- | Incoming: failed to parse packet.
222 reportParseError :: err -> IO ()
223 -- | Incoming: no handler for request.
224 , reportMissingHandler :: meth -> addr -> x -> IO ()
225 -- | Incoming: unable to identify request.
226 , reportUnknown :: addr -> x -> err -> IO ()
227 }
228
229ignoreErrors :: ErrorReporter addr x meth tid err
230ignoreErrors = ErrorReporter
231 { reportParseError = \_ -> return ()
232 , reportMissingHandler = \_ _ _ -> return ()
233 , reportUnknown = \_ _ _ -> return ()
234 }
235
236logErrors :: ( Show addr
237 , Show meth
238 ) => ErrorReporter addr x meth tid String
239logErrors = ErrorReporter
240 { reportParseError = \err -> dput XMisc err
241 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
242 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
243 }
244
245printErrors :: ( Show addr
246 , Show meth
247 ) => Handle -> ErrorReporter addr x meth tid String
248printErrors h = ErrorReporter
249 { reportParseError = \err -> hPutStrLn h err
250 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
251 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
252 }
253
254-- Change the /err/ type for an 'ErrorReporter'.
255instance Contravariant (ErrorReporter addr x meth tid) where
256 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
257 contramap f (ErrorReporter pe mh unk)
258 = ErrorReporter (\e -> pe (f e))
259 mh
260 (\addr x e -> unk addr x (f e))
261
262-- | An incoming message can be classified into three cases.
263data MessageClass err meth tid addr x
264 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
265 -- should include the provided /tid/ value.
266 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
267 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
268 -- with the source and destination address of a message. If it handles the
269 -- message, it should return Nothing. Otherwise, it should return a transform
270 -- (usually /id/) to apply before the next handler examines it.
271 | IsUnknown err -- ^ None of the above.
272
273-- | Handler for an inbound query of type /x/ from an address of type _addr_.
274type MethodHandler err tid addr x = MethodHandlerA err tid addr x x
275
276-- | Handler for an inbound query of type /x/ with outbound response of type
277-- /y/ to an address of type /addr/.
278data MethodHandlerA err tid addr x y = forall a b. MethodHandler
279 { -- | Parse the query into a more specific type for this method.
280 methodParse :: x -> Either err a
281 -- | Serialize the response for transmission, given a context /ctx/ and the origin
282 -- and destination addresses.
283 , methodSerialize :: tid -> addr -> addr -> b -> y
284 -- | Fully typed action to perform upon the query. The remote origin
285 -- address of the query is provided to the handler.
286 --
287 -- TODO: Allow queries to be ignored?
288 , methodAction :: addr -> a -> IO b
289 }
290 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
291 | forall a. NoReply
292 { -- | Parse the query into a more specific type for this method.
293 methodParse :: x -> Either err a
294 -- | Fully typed action to perform upon the query. The remote origin
295 -- address of the query is provided to the handler.
296 , noreplyAction :: addr -> a -> IO ()
297 }
298
299
300-- | To dispatch responses to our outbound queries, we require three
301-- primitives. See the 'transactionMethods' function to create these
302-- primitives out of a lookup table and a generator for transaction ids.
303--
304-- The type variable /d/ is used to represent the current state of the
305-- transaction generator and the table of pending transactions.
306data TransactionMethods d qid addr x = TransactionMethods
307 {
308 -- | Before a query is sent, this function stores an 'MVar' to which the
309 -- response will be written too. The returned /qid/ is a transaction id
310 -- that can be used to forget the 'MVar' if the remote peer is not
311 -- responding.
312 dispatchRegister :: POSIXTime -- time of expiry
313 -> (Maybe x -> IO ()) -- callback upon response (or timeout)
314 -> addr
315 -> d
316 -> STM (qid, d)
317 -- | This method is invoked when an incoming packet /x/ indicates it is
318 -- a response to the transaction with id /qid/. The returned IO action
319 -- will write the packet to the correct 'MVar' thus completing the
320 -- dispatch.
321 , dispatchResponse :: qid -> x -> d -> STM (d, IO ())
322 -- | When a timeout interval elapses, this method is called to remove the
323 -- transaction from the table.
324 , dispatchCancel :: qid -> d -> STM d
325 }
326
327-- | A set of methods necessary for dispatching incoming packets.
328type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x
329
330-- | A set of methods necessary for dispatching incoming packets.
331data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods
332 { -- | Classify an inbound packet as a query or response.
333 classifyInbound :: x -> MessageClass err meth tid addr x
334 -- | Lookup the handler for a inbound query.
335 , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y)
336 -- | Methods for handling incoming responses.
337 , tableMethods :: TransactionMethods tbl tid addr x
338 }
339
340-- | All inputs required to implement a query\/response client.
341type Client err meth tid addr x = ClientA err meth tid addr x x
342
343-- | All inputs required to implement a query\/response client.
344data ClientA err meth tid addr x y = forall tbl. Client
345 { -- | The 'Transport' used to dispatch and receive packets.
346 clientNet :: TransportA err addr x y
347 -- | Methods for handling inbound packets.
348 , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y
349 -- | Methods for reporting various conditions.
350 , clientErrorReporter :: ErrorReporter addr x meth tid err
351 -- | State necessary for routing inbound responses and assigning unique
352 -- /tid/ values for outgoing queries.
353 , clientPending :: TVar tbl
354 -- | An action yielding this client\'s own address. It is invoked once
355 -- on each outbound and inbound packet. It is valid for this to always
356 -- return the same value.
357 --
358 -- The argument, if supplied, is the remote address for the transaction.
359 -- This can be used to maintain consistent aliases for specific peers.
360 , clientAddress :: Maybe addr -> IO addr
361 -- | Transform a query /tid/ value to an appropriate response /tid/
362 -- value. Normally, this would be the identity transformation, but if
363 -- /tid/ includes a unique cryptographic nonce, then it should be
364 -- generated here.
365 , clientResponseId :: tid -> IO tid
366 }
367
368-- | These four parameters are required to implement an outgoing query. A
369-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
370-- might be returned by 'lookupHandler'.
371data MethodSerializerA tid addr x y meth a b = MethodSerializer
372 { -- | Returns the microseconds to wait for a response to this query being
373 -- sent to the given address. The /addr/ may also be modified to add
374 -- routing information.
375 methodTimeout :: addr -> STM (addr,Int)
376 -- | A method identifier used for error reporting. This needn't be the
377 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
378 , method :: meth
379 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
380 -- The /addr/ arguments are, respectively, our own origin address and the
381 -- destination of the request. The /tid/ argument is useful for attaching
382 -- auxiliary notations on all outgoing packets.
383 , wrapQuery :: tid -> addr -> addr -> a -> x
384 -- | Parse an inbound packet /x/ into a response /b/ for this query.
385 , unwrapResponse :: y -> b
386 }
387
388type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b
389
390microsecondsDiff :: Int -> POSIXTime
391microsecondsDiff us = fromIntegral us / 1000000
392
393asyncQuery_ :: Client err meth tid addr x
394 -> MethodSerializer tid addr x meth a b
395 -> a
396 -> addr
397 -> (Maybe b -> IO ())
398 -> IO (tid,POSIXTime,Int)
399asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do
400 now <- getPOSIXTime
401 (tid,addr,expiry) <- atomically $ do
402 tbl <- readTVar pending
403 (addr,expiry) <- methodTimeout meth addr0
404 (tid, tbl') <- dispatchRegister (tableMethods d)
405 (now + microsecondsDiff expiry)
406 (withResponse . fmap (unwrapResponse meth))
407 addr -- XXX: Should be addr0 or addr?
408 tbl
409 -- (addr,expiry) <- methodTimeout meth tid addr0
410 writeTVar pending tbl'
411 return (tid,addr,expiry)
412 self <- whoami (Just addr)
413 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
414 return $ Just ()
415 `catchIOError` (\e -> return Nothing)
416 return (tid,now,expiry)
417
418asyncQuery :: Show meth => Client err meth tid addr x
419 -> MethodSerializer tid addr x meth a b
420 -> a
421 -> addr
422 -> (Maybe b -> IO ())
423 -> IO ()
424asyncQuery client meth q addr withResponse0 = do
425 tm <- getSystemTimerManager
426 tidvar <- newEmptyMVar
427 timedout <- registerTimeout tm 1000000 $ do
428 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
429 withResponse0 Nothing
430 tid <- takeMVar tidvar
431 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
432 case client of
433 Client { clientDispatcher = d, clientPending = pending } -> do
434 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
435 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
436 unregisterTimeout tm timedout
437 withResponse0 x
438 putMVar tidvar tid
439 updateTimeout tm timedout expiry
440 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
441
442-- | Send a query to a remote peer. Note that this function will always time
443-- out if 'forkListener' was never invoked to spawn a thread to receive and
444-- dispatch the response.
445sendQuery ::
446 forall err a b tbl x meth tid addr.
447 Client err meth tid addr x -- ^ A query/response implementation.
448 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
449 -> a -- ^ The outbound query.
450 -> addr -- ^ Destination address of query.
451 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
452sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
453 mvar <- newEmptyMVar
454 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar)
455 mres <- timeout expiry $ takeMVar mvar
456 case mres of
457 Just b -> return $ Just b
458 Nothing -> do
459 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
460 return Nothing
461
462contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
463contramapAddr f (MethodHandler p s a)
464 = MethodHandler
465 p
466 (\tid src dst result -> s tid (f src) (f dst) result)
467 (\addr arg -> a (f addr) arg)
468contramapAddr f (NoReply p a)
469 = NoReply p (\addr arg -> a (f addr) arg)
470
471-- | Query handlers can throw this to ignore a query instead of responding to
472-- it.
473data DropQuery = DropQuery
474 deriving Show
475
476instance Exception DropQuery
477
478-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
479-- parse is successful, the returned IO action will construct our reply if
480-- there is one. Otherwise, a parse err is returned.
481dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke.
482 -> tid -- ^ The transaction id for this query\/response session.
483 -> addr -- ^ Our own address, to which the query was sent.
484 -> x -- ^ The query packet.
485 -> addr -- ^ The origin address of the query.
486 -> Either err (IO (Maybe y))
487dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr =
488 fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a)
489 (\DropQuery -> return Nothing))
490 $ unwrapQ x
491dispatchQuery (NoReply unwrapQ f) tid self x addr =
492 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x
493
494-- | Like 'transactionMethods' but allows extra information to be stored in the
495-- table of pending transactions. This also enables multiple 'Client's to
496-- share a single transaction table.
497transactionMethods' ::
498 ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry
499 -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry
500 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/.
501 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
502 -> TransactionMethods (g,t a) tid addr x
503transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
504 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
505 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
506 let (tid,g') = generate g
507 let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t
508 return ( tid, (g',t') )
509 , dispatchResponse = \tid x (g,t) ->
510 case lookup tid t of
511 Just v -> let t' = delete tid t
512 in return ((g,t'),void $ load v $ Just x)
513 Nothing -> return ((g,t), return ())
514 }
515
516-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
517-- function for generating unique transaction ids.
518transactionMethods ::
519 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
520 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
521 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
522transactionMethods methods generate = transactionMethods' id id methods generate
523
524-- | Handle a single inbound packet and then invoke the given continuation.
525-- The 'forkListener' function is implemented by passing this function to 'fix'
526-- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or
527-- throws an exception.
528handleMessage ::
529 ClientA err meth tid addr x y
530 -> addr
531 -> x
532 -> IO (Maybe (x -> x))
533handleMessage (Client net d err pending whoami responseID) addr plain = do
534 -- Just (Left e) -> do reportParseError err e
535 -- return $! Just id
536 -- Just (Right (plain, addr)) -> do
537 case classifyInbound d plain of
538 IsQuery meth tid -> case lookupHandler d meth of
539 Nothing -> do reportMissingHandler err meth addr plain
540 return $! Just id
541 Just m -> do
542 self <- whoami (Just addr)
543 tid' <- responseID tid
544 either (\e -> do reportParseError err e
545 return $! Just id)
546 (>>= \m -> do mapM_ (sendMessage net addr) m
547 return $! Nothing)
548 (dispatchQuery m tid' self plain addr)
549 IsUnsolicited action -> do
550 self <- whoami (Just addr)
551 action self addr
552 return Nothing
553 IsResponse tid -> do
554 action <- atomically $ do
555 ts0 <- readTVar pending
556 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0
557 writeTVar pending ts
558 return action
559 action
560 return $! Nothing
561 IsUnknown e -> do reportUnknown err addr plain e
562 return $! Just id
563 -- Nothing -> return $! id
564
565-- * UDP Datagrams.
566
567-- | Access the address family of a given 'SockAddr'. This convenient accessor
568-- is missing from 'Network.Socket', so I implemented it here.
569sockAddrFamily :: SockAddr -> Family
570sockAddrFamily (SockAddrInet _ _ ) = AF_INET
571sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
572sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
573#if !MIN_VERSION_network(3,0,0)
574sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated
575#endif
576
577-- | Packets with an empty payload may trigger EOF exception.
578-- 'udpTransport' uses this function to avoid throwing in that
579-- case.
580ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x)
581ignoreEOF sock isClosed def e = do
582 done <- tryReadMVar isClosed
583 case done of
584 Just () -> do close sock
585 dput XMisc "Closing UDP socket."
586 pure Terminated
587 _ -> if isEOFError e then pure def
588 else throwIO e
589
590-- | Hard-coded maximum packet size for incoming UDP Packets received via
591-- 'udpTransport'.
592udpBufferSize :: Int
593udpBufferSize = 65536
594
595-- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError.
596saferSendTo :: Socket -> ByteString -> SockAddr -> IO ()
597saferSendTo sock bs saddr = void (B.sendTo sock bs saddr)
598 `catch` \e ->
599 -- sendTo: does not exist (Network is unreachable)
600 -- Occurs when IPv6 or IPv4 network is not available.
601 -- Currently, we require -threaded to prevent a forever-hang in this case.
602 if isDoesNotExistError e
603 then return ()
604 else throw e
605
606-- | Like 'udpTransport' except also returns the raw socket (for broadcast use).
607udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket)
608udpTransport' bind_address = do
609 let family = sockAddrFamily bind_address
610 sock <- socket family Datagram defaultProtocol
611 when (family == AF_INET6) $ do
612 setSocketOption sock IPv6Only 0
613 setSocketOption sock Broadcast 1
614 bind sock bind_address
615 isClosed <- newEmptyMVar
616 udpTChan <- atomically newTChan
617 let tr = Transport {
618 awaitMessage = \kont -> do
619 r <- readTChan udpTChan
620 return $ kont $! r
621 , sendMessage = case family of
622 AF_INET6 -> \case
623 (SockAddrInet port addr) -> \bs ->
624 -- Change IPv4 to 4mapped6 address.
625 saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0
626 addr6 -> \bs -> saferSendTo sock bs addr6
627 AF_INET -> \case
628 (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do
629 let host4 = toBE32 raw4
630 -- Change 4mapped6 to ordinary IPv4.
631 -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4)
632 saferSendTo sock bs (SockAddrInet port host4)
633 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
634 addr4 -> \bs -> saferSendTo sock bs addr4
635 _ -> \addr bs -> saferSendTo sock bs addr
636 , setActive = \case
637 False -> do
638 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address
639 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed.
640#if MIN_VERSION_network (3,1,0)
641#elif MIN_VERSION_network(3,0,0)
642 let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return
643#else
644 let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return
645#endif
646 withFdSocket sock $ \fd -> do
647 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return ()
648 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage.
649 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd)
650 True -> do
651 udpThread <- forkIO $ fix $ \again -> do
652 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
653 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
654 atomically $ writeTChan udpTChan r
655 case r of Terminated -> return ()
656 _ -> again
657 labelThread udpThread ("udp.io."++show bind_address)
658 }
659 return (tr, sock)
660
661-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
662-- argument is the listen-address for incoming packets. This is a useful
663-- low-level 'Transport' that can be transformed for higher-level protocols
664-- using 'layerTransport'.
665udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString)
666udpTransport bind_address = fst <$> udpTransport' bind_address
667
668chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
669chanTransport chanFromAddr self achan aclosed = Transport
670 { awaitMessage = \kont -> do
671 x <- (uncurry (flip Arrival) <$> readTChan achan)
672 `orElse`
673 (readTVar aclosed >>= check >> return Terminated)
674 return $ kont x
675 , sendMessage = \them bs -> do
676 atomically $ writeTChan (chanFromAddr them) (bs,self)
677 , setActive = \case
678 False -> atomically $ writeTVar aclosed True
679 True -> return ()
680 }
681
682-- | Returns a pair of transports linked together to simulate two computers talking to each other.
683testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString)
684testPairTransport = do
685 achan <- atomically newTChan
686 bchan <- atomically newTChan
687 aclosed <- atomically $ newTVar False
688 bclosed <- atomically $ newTVar False
689 let a = SockAddrInet 1 1
690 b = SockAddrInet 2 2
691 return ( chanTransport (const bchan) a achan aclosed
692 , chanTransport (const achan) b bchan bclosed )
693
694newtype ByAddress err x addr = ByAddress (Transport err addr x)
695
696newtype Tagged x addr = Tagged x
697
698decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x
699decorateAddr tag Terminated = Terminated
700decorateAddr tag (ParseError e) = ParseError e
701decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x
702
703mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x)
704mergeTransports tmap = do
705 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap
706 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap
707 return Transport
708 { awaitMessage = \kont ->
709 foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n)
710 retry
711 tmap
712 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of
713 Just (ByAddress tr) -> sendMessage tr addr x
714 Nothing -> return ()
715 , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap
716 }