diff options
Diffstat (limited to 'dht/src/Network/QueryResponse.hs')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 716 |
1 files changed, 0 insertions, 716 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs deleted file mode 100644 index 20e7ecf0..00000000 --- a/dht/src/Network/QueryResponse.hs +++ /dev/null | |||
@@ -1,716 +0,0 @@ | |||
1 | -- | This module can implement any query\/response protocol. It was written | ||
2 | -- with Kademlia implementations in mind. | ||
3 | |||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | ||
10 | {-# LANGUAGE TupleSections #-} | ||
11 | module Network.QueryResponse where | ||
12 | |||
13 | #ifdef THREAD_DEBUG | ||
14 | import Control.Concurrent.Lifted.Instrument | ||
15 | #else | ||
16 | import Control.Concurrent | ||
17 | import GHC.Conc (labelThread) | ||
18 | #endif | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Exception | ||
21 | import Control.Monad | ||
22 | import qualified Data.ByteString as B | ||
23 | ;import Data.ByteString (ByteString) | ||
24 | import Data.Dependent.Map as DMap | ||
25 | import Data.Dependent.Sum | ||
26 | import Data.Function | ||
27 | import Data.Functor.Contravariant | ||
28 | import Data.Functor.Identity | ||
29 | import Data.GADT.Show | ||
30 | import qualified Data.IntMap.Strict as IntMap | ||
31 | ;import Data.IntMap.Strict (IntMap) | ||
32 | import qualified Data.Map.Strict as Map | ||
33 | ;import Data.Map.Strict (Map) | ||
34 | import Data.Time.Clock.POSIX | ||
35 | import qualified Data.Word64Map as W64Map | ||
36 | ;import Data.Word64Map (Word64Map) | ||
37 | import Data.Word | ||
38 | import Data.Maybe | ||
39 | import GHC.Conc (closeFdWith) | ||
40 | import GHC.Event | ||
41 | import Network.Socket | ||
42 | import Network.Socket.ByteString as B | ||
43 | import System.Endian | ||
44 | import System.IO | ||
45 | import System.IO.Error | ||
46 | import System.Timeout | ||
47 | |||
48 | import DPut | ||
49 | import DebugTag | ||
50 | import Data.TableMethods | ||
51 | |||
52 | -- | An inbound packet or condition raised while monitoring a connection. | ||
53 | data Arrival err addr x | ||
54 | = Terminated -- ^ Virtual message that signals EOF. | ||
55 | | ParseError !err -- ^ A badly-formed message was received. | ||
56 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. | ||
57 | |||
58 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
59 | data TransportA err addr x y = Transport | ||
60 | { -- | Blocks until an inbound packet is available. Then calls the provided | ||
61 | -- continuation with the packet and origin addres or an error condition. | ||
62 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) | ||
63 | -- | Send an /y/ packet to the given destination /addr/. | ||
64 | , sendMessage :: addr -> y -> IO () | ||
65 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
66 | , setActive :: Bool -> IO () | ||
67 | } | ||
68 | |||
69 | type Transport err addr x = TransportA err addr x x | ||
70 | |||
71 | closeTransport :: TransportA err addr x y -> IO () | ||
72 | closeTransport tr = setActive tr False | ||
73 | |||
74 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
75 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
76 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
77 | -- associated public keys. | ||
78 | layerTransportM :: | ||
79 | (x -> addr -> IO (Either err (x', addr'))) | ||
80 | -- ^ Function that attempts to transform a low-level address/packet | ||
81 | -- pair into a higher level representation. | ||
82 | -> (y' -> addr' -> IO (y, addr)) | ||
83 | -- ^ Function to encode a high-level address/packet into a lower level | ||
84 | -- representation. | ||
85 | -> TransportA err addr x y | ||
86 | -- ^ The low-level transport to be transformed. | ||
87 | -> TransportA err addr' x' y' | ||
88 | layerTransportM parse encode tr = | ||
89 | tr { awaitMessage = \kont -> | ||
90 | awaitMessage tr $ \case | ||
91 | Terminated -> kont $ Terminated | ||
92 | ParseError e -> kont $ ParseError e | ||
93 | Arrival addr x -> parse x addr >>= \case | ||
94 | Left e -> kont $ ParseError e | ||
95 | Right (x',addr') -> kont $ Arrival addr' x' | ||
96 | , sendMessage = \addr' msg' -> do | ||
97 | (msg,addr) <- encode msg' addr' | ||
98 | sendMessage tr addr msg | ||
99 | } | ||
100 | |||
101 | |||
102 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
103 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
104 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
105 | -- associated public keys. | ||
106 | layerTransport :: | ||
107 | (x -> addr -> Either err (x', addr')) | ||
108 | -- ^ Function that attempts to transform a low-level address/packet | ||
109 | -- pair into a higher level representation. | ||
110 | -> (y' -> addr' -> (y, addr)) | ||
111 | -- ^ Function to encode a high-level address/packet into a lower level | ||
112 | -- representation. | ||
113 | -> TransportA err addr x y | ||
114 | -- ^ The low-level transport to be transformed. | ||
115 | -> TransportA err addr' x' y' | ||
116 | layerTransport parse encode tr = | ||
117 | layerTransportM (\x addr -> return $ parse x addr) | ||
118 | (\x' addr' -> return $ encode x' addr') | ||
119 | tr | ||
120 | |||
121 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' | ||
122 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
123 | -- both returned 'Transport's to avoid hanging. | ||
124 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
125 | -> ((y,xaddr) -> IO (Maybe (c,a))) | ||
126 | -> TransportA err a b c | ||
127 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
128 | partitionTransportM parse encodex tr = do | ||
129 | tchan <- atomically newTChan | ||
130 | let ytr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
131 | awaitMessage tr $ \m -> case m of | ||
132 | Arrival adr msg -> parse (msg,adr) >>= \case | ||
133 | Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again) | ||
134 | Right (y,yaddr) -> kont $ Arrival yaddr y | ||
135 | ParseError e -> kont $ ParseError e | ||
136 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated | ||
137 | , sendMessage = sendMessage tr | ||
138 | } | ||
139 | xtr = Transport | ||
140 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case | ||
141 | Nothing -> Terminated | ||
142 | Just (x,xaddr) -> Arrival xaddr x | ||
143 | , sendMessage = \addr' msg' -> do | ||
144 | msg_addr <- encodex (msg',addr') | ||
145 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
146 | , setActive = const $ return () | ||
147 | } | ||
148 | return (xtr, ytr) | ||
149 | |||
150 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' | ||
151 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
152 | -- both returned 'Transport's to avoid hanging. | ||
153 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
154 | -> ((y,xaddr) -> Maybe (c,a)) | ||
155 | -> TransportA err a b c | ||
156 | -> IO (TransportA err xaddr x y, TransportA err a b c) | ||
157 | partitionTransport parse encodex tr = | ||
158 | partitionTransportM (return . parse) (return . encodex) tr | ||
159 | |||
160 | -- | | ||
161 | -- * f add x --> Nothing, consume x | ||
162 | -- --> Just id, leave x to a different handler | ||
163 | -- --> Just g, apply g to x and leave that to a different handler | ||
164 | -- | ||
165 | -- Note: If you add a handler to one of the branches before applying a | ||
166 | -- 'mergeTransports' combinator, then this handler may not block or return | ||
167 | -- Nothing. | ||
168 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y | ||
169 | addHandler onParseError f tr = tr | ||
170 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | ||
171 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) | ||
172 | ParseError e -> onParseError e >> kont (ParseError e) | ||
173 | Terminated -> kont Terminated | ||
174 | } | ||
175 | |||
176 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
177 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
178 | onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr | ||
179 | |||
180 | -- * Using a query\/response client. | ||
181 | |||
182 | -- | Fork a thread that handles inbound packets. The returned action may be used | ||
183 | -- to terminate the thread and clean up any related state. | ||
184 | -- | ||
185 | -- Example usage: | ||
186 | -- | ||
187 | -- > -- Start client. | ||
188 | -- > quitServer <- forkListener "listener" (clientNet client) | ||
189 | -- > -- Send a query q, recieve a response r. | ||
190 | -- > r <- sendQuery client method q | ||
191 | -- > -- Quit client. | ||
192 | -- > quitServer | ||
193 | forkListener :: String -> Transport err addr x -> IO (IO ()) | ||
194 | forkListener name client = do | ||
195 | setActive client True | ||
196 | thread_id <- forkIO $ do | ||
197 | myThreadId >>= flip labelThread ("listener."++name) | ||
198 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case | ||
199 | Terminated -> return () | ||
200 | _ -> loop | ||
201 | dput XMisc $ "Listener died: " ++ name | ||
202 | return $ do | ||
203 | setActive client False | ||
204 | -- killThread thread_id | ||
205 | |||
206 | -- * Implementing a query\/response 'Client'. | ||
207 | |||
208 | -- | These methods indicate what should be done upon various conditions. Write | ||
209 | -- to a log file, make debug prints, or simply ignore them. | ||
210 | -- | ||
211 | -- [ /addr/ ] Address of remote peer. | ||
212 | -- | ||
213 | -- [ /x/ ] Incoming or outgoing packet. | ||
214 | -- | ||
215 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
216 | -- | ||
217 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
218 | -- | ||
219 | -- [ /err/ ] Error information, typically a 'String'. | ||
220 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
221 | { -- | Incoming: failed to parse packet. | ||
222 | reportParseError :: err -> IO () | ||
223 | -- | Incoming: no handler for request. | ||
224 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
225 | -- | Incoming: unable to identify request. | ||
226 | , reportUnknown :: addr -> x -> err -> IO () | ||
227 | } | ||
228 | |||
229 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
230 | ignoreErrors = ErrorReporter | ||
231 | { reportParseError = \_ -> return () | ||
232 | , reportMissingHandler = \_ _ _ -> return () | ||
233 | , reportUnknown = \_ _ _ -> return () | ||
234 | } | ||
235 | |||
236 | logErrors :: ( Show addr | ||
237 | , Show meth | ||
238 | ) => ErrorReporter addr x meth tid String | ||
239 | logErrors = ErrorReporter | ||
240 | { reportParseError = \err -> dput XMisc err | ||
241 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
242 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
243 | } | ||
244 | |||
245 | printErrors :: ( Show addr | ||
246 | , Show meth | ||
247 | ) => Handle -> ErrorReporter addr x meth tid String | ||
248 | printErrors h = ErrorReporter | ||
249 | { reportParseError = \err -> hPutStrLn h err | ||
250 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
251 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
252 | } | ||
253 | |||
254 | -- Change the /err/ type for an 'ErrorReporter'. | ||
255 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
256 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
257 | contramap f (ErrorReporter pe mh unk) | ||
258 | = ErrorReporter (\e -> pe (f e)) | ||
259 | mh | ||
260 | (\addr x e -> unk addr x (f e)) | ||
261 | |||
262 | -- | An incoming message can be classified into three cases. | ||
263 | data MessageClass err meth tid addr x | ||
264 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
265 | -- should include the provided /tid/ value. | ||
266 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
267 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
268 | -- with the source and destination address of a message. If it handles the | ||
269 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
270 | -- (usually /id/) to apply before the next handler examines it. | ||
271 | | IsUnknown err -- ^ None of the above. | ||
272 | |||
273 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
274 | type MethodHandler err tid addr x = MethodHandlerA err tid addr x x | ||
275 | |||
276 | -- | Handler for an inbound query of type /x/ with outbound response of type | ||
277 | -- /y/ to an address of type /addr/. | ||
278 | data MethodHandlerA err tid addr x y = forall a b. MethodHandler | ||
279 | { -- | Parse the query into a more specific type for this method. | ||
280 | methodParse :: x -> Either err a | ||
281 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
282 | -- and destination addresses. | ||
283 | , methodSerialize :: tid -> addr -> addr -> b -> y | ||
284 | -- | Fully typed action to perform upon the query. The remote origin | ||
285 | -- address of the query is provided to the handler. | ||
286 | -- | ||
287 | -- TODO: Allow queries to be ignored? | ||
288 | , methodAction :: addr -> a -> IO b | ||
289 | } | ||
290 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
291 | | forall a. NoReply | ||
292 | { -- | Parse the query into a more specific type for this method. | ||
293 | methodParse :: x -> Either err a | ||
294 | -- | Fully typed action to perform upon the query. The remote origin | ||
295 | -- address of the query is provided to the handler. | ||
296 | , noreplyAction :: addr -> a -> IO () | ||
297 | } | ||
298 | |||
299 | |||
300 | -- | To dispatch responses to our outbound queries, we require three | ||
301 | -- primitives. See the 'transactionMethods' function to create these | ||
302 | -- primitives out of a lookup table and a generator for transaction ids. | ||
303 | -- | ||
304 | -- The type variable /d/ is used to represent the current state of the | ||
305 | -- transaction generator and the table of pending transactions. | ||
306 | data TransactionMethods d qid addr x = TransactionMethods | ||
307 | { | ||
308 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
309 | -- response will be written too. The returned /qid/ is a transaction id | ||
310 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
311 | -- responding. | ||
312 | dispatchRegister :: POSIXTime -- time of expiry | ||
313 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | ||
314 | -> addr | ||
315 | -> d | ||
316 | -> STM (qid, d) | ||
317 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
318 | -- a response to the transaction with id /qid/. The returned IO action | ||
319 | -- will write the packet to the correct 'MVar' thus completing the | ||
320 | -- dispatch. | ||
321 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | ||
322 | -- | When a timeout interval elapses, this method is called to remove the | ||
323 | -- transaction from the table. | ||
324 | , dispatchCancel :: qid -> d -> STM d | ||
325 | } | ||
326 | |||
327 | -- | A set of methods necessary for dispatching incoming packets. | ||
328 | type DispatchMethods tbl err meth tid addr x = DispatchMethodsA tbl err meth tid addr x x | ||
329 | |||
330 | -- | A set of methods necessary for dispatching incoming packets. | ||
331 | data DispatchMethodsA tbl err meth tid addr x y = DispatchMethods | ||
332 | { -- | Classify an inbound packet as a query or response. | ||
333 | classifyInbound :: x -> MessageClass err meth tid addr x | ||
334 | -- | Lookup the handler for a inbound query. | ||
335 | , lookupHandler :: meth -> Maybe (MethodHandlerA err tid addr x y) | ||
336 | -- | Methods for handling incoming responses. | ||
337 | , tableMethods :: TransactionMethods tbl tid addr x | ||
338 | } | ||
339 | |||
340 | -- | All inputs required to implement a query\/response client. | ||
341 | type Client err meth tid addr x = ClientA err meth tid addr x x | ||
342 | |||
343 | -- | All inputs required to implement a query\/response client. | ||
344 | data ClientA err meth tid addr x y = forall tbl. Client | ||
345 | { -- | The 'Transport' used to dispatch and receive packets. | ||
346 | clientNet :: TransportA err addr x y | ||
347 | -- | Methods for handling inbound packets. | ||
348 | , clientDispatcher :: DispatchMethodsA tbl err meth tid addr x y | ||
349 | -- | Methods for reporting various conditions. | ||
350 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
351 | -- | State necessary for routing inbound responses and assigning unique | ||
352 | -- /tid/ values for outgoing queries. | ||
353 | , clientPending :: TVar tbl | ||
354 | -- | An action yielding this client\'s own address. It is invoked once | ||
355 | -- on each outbound and inbound packet. It is valid for this to always | ||
356 | -- return the same value. | ||
357 | -- | ||
358 | -- The argument, if supplied, is the remote address for the transaction. | ||
359 | -- This can be used to maintain consistent aliases for specific peers. | ||
360 | , clientAddress :: Maybe addr -> IO addr | ||
361 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
362 | -- value. Normally, this would be the identity transformation, but if | ||
363 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
364 | -- generated here. | ||
365 | , clientResponseId :: tid -> IO tid | ||
366 | } | ||
367 | |||
368 | -- | These four parameters are required to implement an outgoing query. A | ||
369 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
370 | -- might be returned by 'lookupHandler'. | ||
371 | data MethodSerializerA tid addr x y meth a b = MethodSerializer | ||
372 | { -- | Returns the microseconds to wait for a response to this query being | ||
373 | -- sent to the given address. The /addr/ may also be modified to add | ||
374 | -- routing information. | ||
375 | methodTimeout :: addr -> STM (addr,Int) | ||
376 | -- | A method identifier used for error reporting. This needn't be the | ||
377 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
378 | , method :: meth | ||
379 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
380 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
381 | -- destination of the request. The /tid/ argument is useful for attaching | ||
382 | -- auxiliary notations on all outgoing packets. | ||
383 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
384 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
385 | , unwrapResponse :: y -> b | ||
386 | } | ||
387 | |||
388 | type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth a b | ||
389 | |||
390 | microsecondsDiff :: Int -> POSIXTime | ||
391 | microsecondsDiff us = fromIntegral us / 1000000 | ||
392 | |||
393 | asyncQuery_ :: Client err meth tid addr x | ||
394 | -> MethodSerializer tid addr x meth a b | ||
395 | -> a | ||
396 | -> addr | ||
397 | -> (Maybe b -> IO ()) | ||
398 | -> IO (tid,POSIXTime,Int) | ||
399 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | ||
400 | now <- getPOSIXTime | ||
401 | (tid,addr,expiry) <- atomically $ do | ||
402 | tbl <- readTVar pending | ||
403 | (addr,expiry) <- methodTimeout meth addr0 | ||
404 | (tid, tbl') <- dispatchRegister (tableMethods d) | ||
405 | (now + microsecondsDiff expiry) | ||
406 | (withResponse . fmap (unwrapResponse meth)) | ||
407 | addr -- XXX: Should be addr0 or addr? | ||
408 | tbl | ||
409 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
410 | writeTVar pending tbl' | ||
411 | return (tid,addr,expiry) | ||
412 | self <- whoami (Just addr) | ||
413 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
414 | return $ Just () | ||
415 | `catchIOError` (\e -> return Nothing) | ||
416 | return (tid,now,expiry) | ||
417 | |||
418 | asyncQuery :: Show meth => Client err meth tid addr x | ||
419 | -> MethodSerializer tid addr x meth a b | ||
420 | -> a | ||
421 | -> addr | ||
422 | -> (Maybe b -> IO ()) | ||
423 | -> IO () | ||
424 | asyncQuery client meth q addr withResponse0 = do | ||
425 | tm <- getSystemTimerManager | ||
426 | tidvar <- newEmptyMVar | ||
427 | timedout <- registerTimeout tm 1000000 $ do | ||
428 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | ||
429 | withResponse0 Nothing | ||
430 | tid <- takeMVar tidvar | ||
431 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | ||
432 | case client of | ||
433 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
434 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
435 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
436 | unregisterTimeout tm timedout | ||
437 | withResponse0 x | ||
438 | putMVar tidvar tid | ||
439 | updateTimeout tm timedout expiry | ||
440 | dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry | ||
441 | |||
442 | -- | Send a query to a remote peer. Note that this function will always time | ||
443 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | ||
444 | -- dispatch the response. | ||
445 | sendQuery :: | ||
446 | forall err a b tbl x meth tid addr. | ||
447 | Client err meth tid addr x -- ^ A query/response implementation. | ||
448 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | ||
449 | -> a -- ^ The outbound query. | ||
450 | -> addr -- ^ Destination address of query. | ||
451 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | ||
452 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | ||
453 | mvar <- newEmptyMVar | ||
454 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | ||
455 | mres <- timeout expiry $ takeMVar mvar | ||
456 | case mres of | ||
457 | Just b -> return $ Just b | ||
458 | Nothing -> do | ||
459 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
460 | return Nothing | ||
461 | |||
462 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | ||
463 | contramapAddr f (MethodHandler p s a) | ||
464 | = MethodHandler | ||
465 | p | ||
466 | (\tid src dst result -> s tid (f src) (f dst) result) | ||
467 | (\addr arg -> a (f addr) arg) | ||
468 | contramapAddr f (NoReply p a) | ||
469 | = NoReply p (\addr arg -> a (f addr) arg) | ||
470 | |||
471 | -- | Query handlers can throw this to ignore a query instead of responding to | ||
472 | -- it. | ||
473 | data DropQuery = DropQuery | ||
474 | deriving Show | ||
475 | |||
476 | instance Exception DropQuery | ||
477 | |||
478 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | ||
479 | -- parse is successful, the returned IO action will construct our reply if | ||
480 | -- there is one. Otherwise, a parse err is returned. | ||
481 | dispatchQuery :: MethodHandlerA err tid addr x y -- ^ Handler to invoke. | ||
482 | -> tid -- ^ The transaction id for this query\/response session. | ||
483 | -> addr -- ^ Our own address, to which the query was sent. | ||
484 | -> x -- ^ The query packet. | ||
485 | -> addr -- ^ The origin address of the query. | ||
486 | -> Either err (IO (Maybe y)) | ||
487 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | ||
488 | fmap (\a -> catch (Just . wrapR tid self addr <$> f addr a) | ||
489 | (\DropQuery -> return Nothing)) | ||
490 | $ unwrapQ x | ||
491 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | ||
492 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | ||
493 | |||
494 | -- | Like 'transactionMethods' but allows extra information to be stored in the | ||
495 | -- table of pending transactions. This also enables multiple 'Client's to | ||
496 | -- share a single transaction table. | ||
497 | transactionMethods' :: | ||
498 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry | ||
499 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry | ||
500 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
501 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
502 | -> TransactionMethods (g,t a) tid addr x | ||
503 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | ||
504 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | ||
505 | , dispatchRegister = \nowPlusExpiry v a (g,t) -> do | ||
506 | let (tid,g') = generate g | ||
507 | let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t | ||
508 | return ( tid, (g',t') ) | ||
509 | , dispatchResponse = \tid x (g,t) -> | ||
510 | case lookup tid t of | ||
511 | Just v -> let t' = delete tid t | ||
512 | in return ((g,t'),void $ load v $ Just x) | ||
513 | Nothing -> return ((g,t), return ()) | ||
514 | } | ||
515 | |||
516 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | ||
517 | -- function for generating unique transaction ids. | ||
518 | transactionMethods :: | ||
519 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
520 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
521 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | ||
522 | transactionMethods methods generate = transactionMethods' id id methods generate | ||
523 | |||
524 | -- | Handle a single inbound packet and then invoke the given continuation. | ||
525 | -- The 'forkListener' function is implemented by passing this function to 'fix' | ||
526 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or | ||
527 | -- throws an exception. | ||
528 | handleMessage :: | ||
529 | ClientA err meth tid addr x y | ||
530 | -> addr | ||
531 | -> x | ||
532 | -> IO (Maybe (x -> x)) | ||
533 | handleMessage (Client net d err pending whoami responseID) addr plain = do | ||
534 | -- Just (Left e) -> do reportParseError err e | ||
535 | -- return $! Just id | ||
536 | -- Just (Right (plain, addr)) -> do | ||
537 | case classifyInbound d plain of | ||
538 | IsQuery meth tid -> case lookupHandler d meth of | ||
539 | Nothing -> do reportMissingHandler err meth addr plain | ||
540 | return $! Just id | ||
541 | Just m -> do | ||
542 | self <- whoami (Just addr) | ||
543 | tid' <- responseID tid | ||
544 | either (\e -> do reportParseError err e | ||
545 | return $! Just id) | ||
546 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
547 | return $! Nothing) | ||
548 | (dispatchQuery m tid' self plain addr) | ||
549 | IsUnsolicited action -> do | ||
550 | self <- whoami (Just addr) | ||
551 | action self addr | ||
552 | return Nothing | ||
553 | IsResponse tid -> do | ||
554 | action <- atomically $ do | ||
555 | ts0 <- readTVar pending | ||
556 | (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 | ||
557 | writeTVar pending ts | ||
558 | return action | ||
559 | action | ||
560 | return $! Nothing | ||
561 | IsUnknown e -> do reportUnknown err addr plain e | ||
562 | return $! Just id | ||
563 | -- Nothing -> return $! id | ||
564 | |||
565 | -- * UDP Datagrams. | ||
566 | |||
567 | -- | Access the address family of a given 'SockAddr'. This convenient accessor | ||
568 | -- is missing from 'Network.Socket', so I implemented it here. | ||
569 | sockAddrFamily :: SockAddr -> Family | ||
570 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
571 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
572 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
573 | #if !MIN_VERSION_network(3,0,0) | ||
574 | sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated | ||
575 | #endif | ||
576 | |||
577 | -- | Packets with an empty payload may trigger EOF exception. | ||
578 | -- 'udpTransport' uses this function to avoid throwing in that | ||
579 | -- case. | ||
580 | ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) | ||
581 | ignoreEOF sock isClosed def e = do | ||
582 | done <- tryReadMVar isClosed | ||
583 | case done of | ||
584 | Just () -> do close sock | ||
585 | dput XMisc "Closing UDP socket." | ||
586 | pure Terminated | ||
587 | _ -> if isEOFError e then pure def | ||
588 | else throwIO e | ||
589 | |||
590 | -- | Hard-coded maximum packet size for incoming UDP Packets received via | ||
591 | -- 'udpTransport'. | ||
592 | udpBufferSize :: Int | ||
593 | udpBufferSize = 65536 | ||
594 | |||
595 | -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. | ||
596 | saferSendTo :: Socket -> ByteString -> SockAddr -> IO () | ||
597 | saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) | ||
598 | `catch` \e -> | ||
599 | -- sendTo: does not exist (Network is unreachable) | ||
600 | -- Occurs when IPv6 or IPv4 network is not available. | ||
601 | -- Currently, we require -threaded to prevent a forever-hang in this case. | ||
602 | if isDoesNotExistError e | ||
603 | then return () | ||
604 | else throw e | ||
605 | |||
606 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). | ||
607 | udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) | ||
608 | udpTransport' bind_address = do | ||
609 | let family = sockAddrFamily bind_address | ||
610 | sock <- socket family Datagram defaultProtocol | ||
611 | when (family == AF_INET6) $ do | ||
612 | setSocketOption sock IPv6Only 0 | ||
613 | setSocketOption sock Broadcast 1 | ||
614 | bind sock bind_address | ||
615 | isClosed <- newEmptyMVar | ||
616 | udpTChan <- atomically newTChan | ||
617 | let tr = Transport { | ||
618 | awaitMessage = \kont -> do | ||
619 | r <- readTChan udpTChan | ||
620 | return $ kont $! r | ||
621 | , sendMessage = case family of | ||
622 | AF_INET6 -> \case | ||
623 | (SockAddrInet port addr) -> \bs -> | ||
624 | -- Change IPv4 to 4mapped6 address. | ||
625 | saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 | ||
626 | addr6 -> \bs -> saferSendTo sock bs addr6 | ||
627 | AF_INET -> \case | ||
628 | (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do | ||
629 | let host4 = toBE32 raw4 | ||
630 | -- Change 4mapped6 to ordinary IPv4. | ||
631 | -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) | ||
632 | saferSendTo sock bs (SockAddrInet port host4) | ||
633 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | ||
634 | addr4 -> \bs -> saferSendTo sock bs addr4 | ||
635 | _ -> \addr bs -> saferSendTo sock bs addr | ||
636 | , setActive = \case | ||
637 | False -> do | ||
638 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address | ||
639 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. | ||
640 | #if MIN_VERSION_network (3,1,0) | ||
641 | #elif MIN_VERSION_network(3,0,0) | ||
642 | let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return | ||
643 | #else | ||
644 | let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return | ||
645 | #endif | ||
646 | withFdSocket sock $ \fd -> do | ||
647 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () | ||
648 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. | ||
649 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) | ||
650 | True -> do | ||
651 | udpThread <- forkIO $ fix $ \again -> do | ||
652 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | ||
653 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | ||
654 | atomically $ writeTChan udpTChan r | ||
655 | case r of Terminated -> return () | ||
656 | _ -> again | ||
657 | labelThread udpThread ("udp.io."++show bind_address) | ||
658 | } | ||
659 | return (tr, sock) | ||
660 | |||
661 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
662 | -- argument is the listen-address for incoming packets. This is a useful | ||
663 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
664 | -- using 'layerTransport'. | ||
665 | udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) | ||
666 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
667 | |||
668 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | ||
669 | chanTransport chanFromAddr self achan aclosed = Transport | ||
670 | { awaitMessage = \kont -> do | ||
671 | x <- (uncurry (flip Arrival) <$> readTChan achan) | ||
672 | `orElse` | ||
673 | (readTVar aclosed >>= check >> return Terminated) | ||
674 | return $ kont x | ||
675 | , sendMessage = \them bs -> do | ||
676 | atomically $ writeTChan (chanFromAddr them) (bs,self) | ||
677 | , setActive = \case | ||
678 | False -> atomically $ writeTVar aclosed True | ||
679 | True -> return () | ||
680 | } | ||
681 | |||
682 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | ||
683 | testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString) | ||
684 | testPairTransport = do | ||
685 | achan <- atomically newTChan | ||
686 | bchan <- atomically newTChan | ||
687 | aclosed <- atomically $ newTVar False | ||
688 | bclosed <- atomically $ newTVar False | ||
689 | let a = SockAddrInet 1 1 | ||
690 | b = SockAddrInet 2 2 | ||
691 | return ( chanTransport (const bchan) a achan aclosed | ||
692 | , chanTransport (const achan) b bchan bclosed ) | ||
693 | |||
694 | newtype ByAddress err x addr = ByAddress (Transport err addr x) | ||
695 | |||
696 | newtype Tagged x addr = Tagged x | ||
697 | |||
698 | decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x | ||
699 | decorateAddr tag Terminated = Terminated | ||
700 | decorateAddr tag (ParseError e) = ParseError e | ||
701 | decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x | ||
702 | |||
703 | mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x) | ||
704 | mergeTransports tmap = do | ||
705 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap | ||
706 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap | ||
707 | return Transport | ||
708 | { awaitMessage = \kont -> | ||
709 | foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) | ||
710 | retry | ||
711 | tmap | ||
712 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of | ||
713 | Just (ByAddress tr) -> sendMessage tr addr x | ||
714 | Nothing -> return () | ||
715 | , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap | ||
716 | } | ||