diff options
Diffstat (limited to 'src/Network/QueryResponse.hs')
-rw-r--r-- | src/Network/QueryResponse.hs | 638 |
1 files changed, 0 insertions, 638 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs deleted file mode 100644 index c4ff50e3..00000000 --- a/src/Network/QueryResponse.hs +++ /dev/null | |||
@@ -1,638 +0,0 @@ | |||
1 | -- | This module can implement any query\/response protocol. It was written | ||
2 | -- with Kademlia implementations in mind. | ||
3 | |||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | ||
10 | {-# LANGUAGE TupleSections #-} | ||
11 | module Network.QueryResponse where | ||
12 | |||
13 | #ifdef THREAD_DEBUG | ||
14 | import Control.Concurrent.Lifted.Instrument | ||
15 | #else | ||
16 | import Control.Concurrent | ||
17 | import GHC.Conc (labelThread) | ||
18 | #endif | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Exception | ||
21 | import Control.Monad | ||
22 | import qualified Data.ByteString as B | ||
23 | ;import Data.ByteString (ByteString) | ||
24 | import Data.Function | ||
25 | import Data.Functor.Contravariant | ||
26 | import qualified Data.IntMap.Strict as IntMap | ||
27 | ;import Data.IntMap.Strict (IntMap) | ||
28 | import qualified Data.Map.Strict as Map | ||
29 | ;import Data.Map.Strict (Map) | ||
30 | import Data.Time.Clock.POSIX | ||
31 | import qualified Data.Word64Map as W64Map | ||
32 | ;import Data.Word64Map (Word64Map) | ||
33 | import Data.Word | ||
34 | import Data.Maybe | ||
35 | import GHC.Event | ||
36 | import Network.Socket | ||
37 | import Network.Socket.ByteString as B | ||
38 | import System.Endian | ||
39 | import System.IO | ||
40 | import System.IO.Error | ||
41 | import System.Timeout | ||
42 | import DPut | ||
43 | import DebugTag | ||
44 | import Data.TableMethods | ||
45 | |||
46 | -- | Three methods are required to implement a datagram based query\/response protocol. | ||
47 | data TransportA err addr x y = Transport | ||
48 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | ||
49 | -- no more packets are expected due to a shutdown or close event. | ||
50 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | ||
51 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
52 | awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a | ||
53 | -- | Send an /y/ packet to the given destination /addr/. | ||
54 | , sendMessage :: addr -> y -> IO () | ||
55 | -- | Shutdown and clean up any state related to this 'Transport'. | ||
56 | , closeTransport :: IO () | ||
57 | } | ||
58 | |||
59 | type Transport err addr x = TransportA err addr x x | ||
60 | |||
61 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
62 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
63 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
64 | -- associated public keys. | ||
65 | layerTransportM :: | ||
66 | (x -> addr -> IO (Either err (x', addr'))) | ||
67 | -- ^ Function that attempts to transform a low-level address/packet | ||
68 | -- pair into a higher level representation. | ||
69 | -> (y' -> addr' -> IO (y, addr)) | ||
70 | -- ^ Function to encode a high-level address/packet into a lower level | ||
71 | -- representation. | ||
72 | -> TransportA err addr x y | ||
73 | -- ^ The low-level transport to be transformed. | ||
74 | -> TransportA err addr' x' y' | ||
75 | layerTransportM parse encode tr = | ||
76 | tr { awaitMessage = \kont -> | ||
77 | awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join | ||
78 | , sendMessage = \addr' msg' -> do | ||
79 | (msg,addr) <- encode msg' addr' | ||
80 | sendMessage tr addr msg | ||
81 | } | ||
82 | |||
83 | |||
84 | -- | This function modifies a 'Transport' to use higher-level addresses and | ||
85 | -- packet representations. It could be used to change UDP 'ByteString's into | ||
86 | -- bencoded syntax trees or to add an encryption layer in which addresses have | ||
87 | -- associated public keys. | ||
88 | layerTransport :: | ||
89 | (x -> addr -> Either err (x', addr')) | ||
90 | -- ^ Function that attempts to transform a low-level address/packet | ||
91 | -- pair into a higher level representation. | ||
92 | -> (y' -> addr' -> (y, addr)) | ||
93 | -- ^ Function to encode a high-level address/packet into a lower level | ||
94 | -- representation. | ||
95 | -> TransportA err addr x y | ||
96 | -- ^ The low-level transport to be transformed. | ||
97 | -> TransportA err addr' x' y' | ||
98 | layerTransport parse encode tr = | ||
99 | layerTransportM (\x addr -> return $ parse x addr) | ||
100 | (\x' addr' -> return $ encode x' addr') | ||
101 | tr | ||
102 | |||
103 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | ||
104 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
105 | -- both returned 'Transport's to avoid hanging. | ||
106 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
107 | -> ((x,xaddr) -> Maybe (b,a)) | ||
108 | -> Transport err a b | ||
109 | -> IO (Transport err xaddr x, Transport err a b) | ||
110 | partitionTransport parse encodex tr = | ||
111 | partitionTransportM (return . parse) (return . encodex) tr | ||
112 | |||
113 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | ||
114 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
115 | -- both returned 'Transport's to avoid hanging. | ||
116 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
117 | -> ((x,xaddr) -> IO (Maybe (b,a))) | ||
118 | -> Transport err a b | ||
119 | -> IO (Transport err xaddr x, Transport err a b) | ||
120 | partitionTransportM parse encodex tr = do | ||
121 | mvar <- newEmptyMVar | ||
122 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
123 | awaitMessage tr $ \m -> case m of | ||
124 | Just (Right msg) -> parse msg >>= | ||
125 | either (kont . Just . Right) | ||
126 | (\y -> putMVar mvar y >> again) | ||
127 | Just (Left e) -> kont $ Just (Left e) | ||
128 | Nothing -> kont Nothing | ||
129 | , sendMessage = \addr' msg' -> do | ||
130 | msg_addr <- encodex (msg',addr') | ||
131 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | ||
132 | } | ||
133 | ytr = Transport | ||
134 | { awaitMessage = \kont -> takeMVar mvar >>= kont . Just . Right | ||
135 | , sendMessage = sendMessage tr | ||
136 | , closeTransport = return () | ||
137 | } | ||
138 | return (xtr, ytr) | ||
139 | |||
140 | partitionAndForkTransport :: | ||
141 | (dst -> msg -> IO ()) | ||
142 | -> ((b,a) -> IO (Either (x,xaddr) (b,a))) | ||
143 | -> ((x,xaddr) -> IO (Maybe (Either (msg,dst) (b,a)))) | ||
144 | -> Transport err a b | ||
145 | -> IO (Transport err xaddr x, Transport err a b) | ||
146 | partitionAndForkTransport forkedSend parse encodex tr = do | ||
147 | mvar <- newEmptyMVar | ||
148 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | ||
149 | awaitMessage tr $ \m -> case m of | ||
150 | Just (Right msg) -> parse msg >>= | ||
151 | either (kont . Just . Right) | ||
152 | (\y -> putMVar mvar y >> again) | ||
153 | Just (Left e) -> kont $ Just (Left e) | ||
154 | Nothing -> kont Nothing | ||
155 | , sendMessage = \addr' msg' -> do | ||
156 | msg_addr <- encodex (msg',addr') | ||
157 | case msg_addr of | ||
158 | Just (Right (b,a)) -> sendMessage tr a b | ||
159 | Just (Left (msg,dst)) -> forkedSend dst msg | ||
160 | Nothing -> return () | ||
161 | } | ||
162 | ytr = Transport | ||
163 | { awaitMessage = \kont -> takeMVar mvar >>= kont . Just . Right | ||
164 | , sendMessage = sendMessage tr | ||
165 | , closeTransport = return () | ||
166 | } | ||
167 | return (xtr, ytr) | ||
168 | |||
169 | -- | | ||
170 | -- * f add x --> Nothing, consume x | ||
171 | -- --> Just id, leave x to a different handler | ||
172 | -- --> Just g, apply g to x and leave that to a different handler | ||
173 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | ||
174 | addHandler onParseError f tr = tr | ||
175 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do | ||
176 | case m of | ||
177 | Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) | ||
178 | Just (Left e ) -> onParseError e >> kont (Just $ Left e) | ||
179 | Nothing -> kont $ Nothing | ||
180 | } | ||
181 | |||
182 | -- | Modify a 'Transport' to invoke an action upon every received packet. | ||
183 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | ||
184 | onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr | ||
185 | |||
186 | -- * Using a query\/response client. | ||
187 | |||
188 | -- | Fork a thread that handles inbound packets. The returned action may be used | ||
189 | -- to terminate the thread and clean up any related state. | ||
190 | -- | ||
191 | -- Example usage: | ||
192 | -- | ||
193 | -- > -- Start client. | ||
194 | -- > quitServer <- forkListener "listener" (clientNet client) | ||
195 | -- > -- Send a query q, recieve a response r. | ||
196 | -- > r <- sendQuery client method q | ||
197 | -- > -- Quit client. | ||
198 | -- > quitServer | ||
199 | forkListener :: String -> Transport err addr x -> IO (IO ()) | ||
200 | forkListener name client = do | ||
201 | thread_id <- forkIO $ do | ||
202 | myThreadId >>= flip labelThread ("listener."++name) | ||
203 | fix $ awaitMessage client . const | ||
204 | dput XMisc $ "Listener died: " ++ name | ||
205 | return $ do | ||
206 | closeTransport client | ||
207 | killThread thread_id | ||
208 | |||
209 | asyncQuery_ :: Client err meth tid addr x | ||
210 | -> MethodSerializer tid addr x meth a b | ||
211 | -> a | ||
212 | -> addr | ||
213 | -> (Maybe b -> IO ()) | ||
214 | -> IO (tid,POSIXTime,Int) | ||
215 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | ||
216 | now <- getPOSIXTime | ||
217 | (tid,addr,expiry) <- atomically $ do | ||
218 | tbl <- readTVar pending | ||
219 | ((tid,addr,expiry), tbl') <- dispatchRegister (tableMethods d) | ||
220 | (methodTimeout meth) | ||
221 | now | ||
222 | (withResponse . fmap (unwrapResponse meth)) | ||
223 | addr0 | ||
224 | tbl | ||
225 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
226 | writeTVar pending tbl' | ||
227 | return (tid,addr,expiry) | ||
228 | self <- whoami (Just addr) | ||
229 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | ||
230 | return $ Just () | ||
231 | `catchIOError` (\e -> return Nothing) | ||
232 | return (tid,now,expiry) | ||
233 | |||
234 | asyncQuery :: Show meth => Client err meth tid addr x | ||
235 | -> MethodSerializer tid addr x meth a b | ||
236 | -> a | ||
237 | -> addr | ||
238 | -> (Maybe b -> IO ()) | ||
239 | -> IO () | ||
240 | asyncQuery client meth q addr withResponse0 = do | ||
241 | tm <- getSystemTimerManager | ||
242 | tidvar <- newEmptyMVar | ||
243 | timedout <- registerTimeout tm 1000000 $ do | ||
244 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | ||
245 | withResponse0 Nothing | ||
246 | tid <- takeMVar tidvar | ||
247 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | ||
248 | case client of | ||
249 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
250 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
251 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
252 | unregisterTimeout tm timedout | ||
253 | withResponse0 x | ||
254 | putMVar tidvar tid | ||
255 | updateTimeout tm timedout expiry | ||
256 | dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry | ||
257 | |||
258 | -- | Send a query to a remote peer. Note that this function will always time | ||
259 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | ||
260 | -- dispatch the response. | ||
261 | sendQuery :: | ||
262 | forall err a b tbl x meth tid addr. | ||
263 | Client err meth tid addr x -- ^ A query/response implementation. | ||
264 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | ||
265 | -> a -- ^ The outbound query. | ||
266 | -> addr -- ^ Destination address of query. | ||
267 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | ||
268 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | ||
269 | mvar <- newEmptyMVar | ||
270 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | ||
271 | mres <- timeout expiry $ takeMVar mvar | ||
272 | case mres of | ||
273 | Just b -> return $ Just b | ||
274 | Nothing -> do | ||
275 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
276 | return Nothing | ||
277 | |||
278 | -- * Implementing a query\/response 'Client'. | ||
279 | |||
280 | -- | All inputs required to implement a query\/response client. | ||
281 | data Client err meth tid addr x = forall tbl. Client | ||
282 | { -- | The 'Transport' used to dispatch and receive packets. | ||
283 | clientNet :: Transport err addr x | ||
284 | -- | Methods for handling inbound packets. | ||
285 | , clientDispatcher :: DispatchMethods tbl err meth tid addr x | ||
286 | -- | Methods for reporting various conditions. | ||
287 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
288 | -- | State necessary for routing inbound responses and assigning unique | ||
289 | -- /tid/ values for outgoing queries. | ||
290 | , clientPending :: TVar tbl | ||
291 | -- | An action yielding this client\'s own address. It is invoked once | ||
292 | -- on each outbound and inbound packet. It is valid for this to always | ||
293 | -- return the same value. | ||
294 | -- | ||
295 | -- The argument, if supplied, is the remote address for the transaction. | ||
296 | -- This can be used to maintain consistent aliases for specific peers. | ||
297 | , clientAddress :: Maybe addr -> IO addr | ||
298 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
299 | -- value. Normally, this would be the identity transformation, but if | ||
300 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
301 | -- generated here. | ||
302 | , clientResponseId :: tid -> IO tid | ||
303 | } | ||
304 | |||
305 | -- | An incoming message can be classified into three cases. | ||
306 | data MessageClass err meth tid addr x | ||
307 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
308 | -- should include the provided /tid/ value. | ||
309 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
310 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
311 | -- with the source and destination address of a message. If it handles the | ||
312 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
313 | -- (usually /id/) to apply before the next handler examines it. | ||
314 | | IsUnknown err -- ^ None of the above. | ||
315 | |||
316 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
317 | data MethodHandler err tid addr x = forall a b. MethodHandler | ||
318 | { -- | Parse the query into a more specific type for this method. | ||
319 | methodParse :: x -> Either err a | ||
320 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
321 | -- and destination addresses. | ||
322 | , methodSerialize :: tid -> addr -> addr -> b -> x | ||
323 | -- | Fully typed action to perform upon the query. The remote origin | ||
324 | -- address of the query is provided to the handler. | ||
325 | , methodAction :: addr -> a -> IO b | ||
326 | } | ||
327 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
328 | | forall a. NoReply | ||
329 | { -- | Parse the query into a more specific type for this method. | ||
330 | methodParse :: x -> Either err a | ||
331 | -- | Fully typed action to perform upon the query. The remote origin | ||
332 | -- address of the query is provided to the handler. | ||
333 | , noreplyAction :: addr -> a -> IO () | ||
334 | } | ||
335 | |||
336 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | ||
337 | contramapAddr f (MethodHandler p s a) | ||
338 | = MethodHandler | ||
339 | p | ||
340 | (\tid src dst result -> s tid (f src) (f dst) result) | ||
341 | (\addr arg -> a (f addr) arg) | ||
342 | contramapAddr f (NoReply p a) | ||
343 | = NoReply p (\addr arg -> a (f addr) arg) | ||
344 | |||
345 | |||
346 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | ||
347 | -- parse is successful, the returned IO action will construct our reply if | ||
348 | -- there is one. Otherwise, a parse err is returned. | ||
349 | dispatchQuery :: MethodHandler err tid addr x -- ^ Handler to invoke. | ||
350 | -> tid -- ^ The transaction id for this query\/response session. | ||
351 | -> addr -- ^ Our own address, to which the query was sent. | ||
352 | -> x -- ^ The query packet. | ||
353 | -> addr -- ^ The origin address of the query. | ||
354 | -> Either err (IO (Maybe x)) | ||
355 | dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | ||
356 | fmap (\a -> Just . wrapR tid self addr <$> f addr a) $ unwrapQ x | ||
357 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | ||
358 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | ||
359 | |||
360 | -- | These four parameters are required to implement an outgoing query. A | ||
361 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
362 | -- might be returned by 'lookupHandler'. | ||
363 | data MethodSerializer tid addr x meth a b = MethodSerializer | ||
364 | { -- | Returns the microseconds to wait for a response to this query being | ||
365 | -- sent to the given address. The /addr/ may also be modified to add | ||
366 | -- routing information. | ||
367 | methodTimeout :: tid -> addr -> STM (addr,Int) | ||
368 | -- | A method identifier used for error reporting. This needn't be the | ||
369 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
370 | , method :: meth | ||
371 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
372 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
373 | -- destination of the request. The /tid/ argument is useful for attaching | ||
374 | -- auxiliary notations on all outgoing packets. | ||
375 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
376 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
377 | , unwrapResponse :: x -> b | ||
378 | } | ||
379 | |||
380 | |||
381 | -- | To dispatch responses to our outbound queries, we require three | ||
382 | -- primitives. See the 'transactionMethods' function to create these | ||
383 | -- primitives out of a lookup table and a generator for transaction ids. | ||
384 | -- | ||
385 | -- The type variable /d/ is used to represent the current state of the | ||
386 | -- transaction generator and the table of pending transactions. | ||
387 | data TransactionMethods d tid addr x = TransactionMethods | ||
388 | { | ||
389 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
390 | -- response will be written too. The returned /tid/ is a transaction id | ||
391 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
392 | -- responding. | ||
393 | dispatchRegister :: (tid -> addr -> STM (addr,Int)) -> POSIXTime -> (Maybe x -> IO ()) -> addr -> d -> STM ((tid,addr,Int), d) | ||
394 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
395 | -- a response to the transaction with id /tid/. The returned IO action | ||
396 | -- will write the packet to the correct 'MVar' thus completing the | ||
397 | -- dispatch. | ||
398 | , dispatchResponse :: tid -> x -> d -> STM (d, IO ()) | ||
399 | -- | When a timeout interval elapses, this method is called to remove the | ||
400 | -- transaction from the table. | ||
401 | , dispatchCancel :: tid -> d -> STM d | ||
402 | } | ||
403 | |||
404 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | ||
405 | -- function for generating unique transaction ids. | ||
406 | transactionMethods :: | ||
407 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
408 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
409 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | ||
410 | transactionMethods methods generate = transactionMethods' id id methods generate | ||
411 | |||
412 | microsecondsDiff :: Int -> POSIXTime | ||
413 | microsecondsDiff us = fromIntegral us / 1000000 | ||
414 | |||
415 | -- | Like 'transactionMethods' but allows extra information to be stored in the | ||
416 | -- table of pending transactions. This also enables multiple 'Client's to | ||
417 | -- share a single transaction table. | ||
418 | transactionMethods' :: | ||
419 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry | ||
420 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry | ||
421 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
422 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
423 | -> TransactionMethods (g,t a) tid addr x | ||
424 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | ||
425 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | ||
426 | , dispatchRegister = \getTimeout now v a0 (g,t) -> do | ||
427 | let (tid,g') = generate g | ||
428 | (a,expiry) <- getTimeout tid a0 | ||
429 | let t' = insert tid (store v) (now + microsecondsDiff expiry) t | ||
430 | return ( (tid,a,expiry), (g',t') ) | ||
431 | , dispatchResponse = \tid x (g,t) -> | ||
432 | case lookup tid t of | ||
433 | Just v -> let t' = delete tid t | ||
434 | in return ((g,t'),void $ load v $ Just x) | ||
435 | Nothing -> return ((g,t), return ()) | ||
436 | } | ||
437 | |||
438 | -- | A set of methods necessary for dispatching incoming packets. | ||
439 | data DispatchMethods tbl err meth tid addr x = DispatchMethods | ||
440 | { -- | Classify an inbound packet as a query or response. | ||
441 | classifyInbound :: x -> MessageClass err meth tid addr x | ||
442 | -- | Lookup the handler for a inbound query. | ||
443 | , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) | ||
444 | -- | Methods for handling incoming responses. | ||
445 | , tableMethods :: TransactionMethods tbl tid addr x | ||
446 | } | ||
447 | |||
448 | -- | These methods indicate what should be done upon various conditions. Write | ||
449 | -- to a log file, make debug prints, or simply ignore them. | ||
450 | -- | ||
451 | -- [ /addr/ ] Address of remote peer. | ||
452 | -- | ||
453 | -- [ /x/ ] Incoming or outgoing packet. | ||
454 | -- | ||
455 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
456 | -- | ||
457 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
458 | -- | ||
459 | -- [ /err/ ] Error information, typically a 'String'. | ||
460 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
461 | { -- | Incoming: failed to parse packet. | ||
462 | reportParseError :: err -> IO () | ||
463 | -- | Incoming: no handler for request. | ||
464 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
465 | -- | Incoming: unable to identify request. | ||
466 | , reportUnknown :: addr -> x -> err -> IO () | ||
467 | } | ||
468 | |||
469 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
470 | ignoreErrors = ErrorReporter | ||
471 | { reportParseError = \_ -> return () | ||
472 | , reportMissingHandler = \_ _ _ -> return () | ||
473 | , reportUnknown = \_ _ _ -> return () | ||
474 | } | ||
475 | |||
476 | logErrors :: ( Show addr | ||
477 | , Show meth | ||
478 | ) => ErrorReporter addr x meth tid String | ||
479 | logErrors = ErrorReporter | ||
480 | { reportParseError = \err -> dput XMisc err | ||
481 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
482 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
483 | } | ||
484 | |||
485 | printErrors :: ( Show addr | ||
486 | , Show meth | ||
487 | ) => Handle -> ErrorReporter addr x meth tid String | ||
488 | printErrors h = ErrorReporter | ||
489 | { reportParseError = \err -> hPutStrLn h err | ||
490 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
491 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
492 | } | ||
493 | |||
494 | -- Change the /err/ type for an 'ErrorReporter'. | ||
495 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
496 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
497 | contramap f (ErrorReporter pe mh unk) | ||
498 | = ErrorReporter (\e -> pe (f e)) | ||
499 | mh | ||
500 | (\addr x e -> unk addr x (f e)) | ||
501 | |||
502 | -- | Handle a single inbound packet and then invoke the given continuation. | ||
503 | -- The 'forkListener' function is implemented by passing this function to 'fix' | ||
504 | -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or | ||
505 | -- throws an exception. | ||
506 | handleMessage :: | ||
507 | Client err meth tid addr x | ||
508 | -> addr | ||
509 | -> x | ||
510 | -> IO (Maybe (x -> x)) | ||
511 | handleMessage (Client net d err pending whoami responseID) addr plain = do | ||
512 | -- Just (Left e) -> do reportParseError err e | ||
513 | -- return $! Just id | ||
514 | -- Just (Right (plain, addr)) -> do | ||
515 | case classifyInbound d plain of | ||
516 | IsQuery meth tid -> case lookupHandler d meth of | ||
517 | Nothing -> do reportMissingHandler err meth addr plain | ||
518 | return $! Just id | ||
519 | Just m -> do | ||
520 | self <- whoami (Just addr) | ||
521 | tid' <- responseID tid | ||
522 | either (\e -> do reportParseError err e | ||
523 | return $! Just id) | ||
524 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
525 | return $! Nothing) | ||
526 | (dispatchQuery m tid' self plain addr) | ||
527 | IsUnsolicited action -> do | ||
528 | self <- whoami (Just addr) | ||
529 | action self addr | ||
530 | return Nothing | ||
531 | IsResponse tid -> do | ||
532 | action <- atomically $ do | ||
533 | ts0 <- readTVar pending | ||
534 | (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 | ||
535 | writeTVar pending ts | ||
536 | return action | ||
537 | action | ||
538 | return $! Nothing | ||
539 | IsUnknown e -> do reportUnknown err addr plain e | ||
540 | return $! Just id | ||
541 | -- Nothing -> return $! id | ||
542 | |||
543 | -- * UDP Datagrams. | ||
544 | |||
545 | -- | Access the address family of a given 'SockAddr'. This convenient accessor | ||
546 | -- is missing from 'Network.Socket', so I implemented it here. | ||
547 | sockAddrFamily :: SockAddr -> Family | ||
548 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
549 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
550 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
551 | sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated | ||
552 | |||
553 | -- | Packets with an empty payload may trigger EOF exception. | ||
554 | -- 'udpTransport' uses this function to avoid throwing in that | ||
555 | -- case. | ||
556 | ignoreEOF :: a -> IOError -> IO a | ||
557 | ignoreEOF def e | isEOFError e = pure def | ||
558 | | otherwise = throwIO e | ||
559 | |||
560 | -- | Hard-coded maximum packet size for incoming UDP Packets received via | ||
561 | -- 'udpTransport'. | ||
562 | udpBufferSize :: Int | ||
563 | udpBufferSize = 65536 | ||
564 | |||
565 | -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. | ||
566 | saferSendTo :: Socket -> ByteString -> SockAddr -> IO () | ||
567 | saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) | ||
568 | `catch` \e -> | ||
569 | -- sendTo: does not exist (Network is unreachable) | ||
570 | -- Occurs when IPv6 or IPv4 network is not available. | ||
571 | -- Currently, we require -threaded to prevent a forever-hang in this case. | ||
572 | if isDoesNotExistError e | ||
573 | then return () | ||
574 | else throw e | ||
575 | |||
576 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
577 | -- argument is the listen-address for incoming packets. This is a useful | ||
578 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
579 | -- using 'layerTransport'. | ||
580 | udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString) | ||
581 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
582 | |||
583 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). | ||
584 | udpTransport' :: SockAddr -> IO (Transport err SockAddr ByteString, Socket) | ||
585 | udpTransport' bind_address = do | ||
586 | let family = sockAddrFamily bind_address | ||
587 | sock <- socket family Datagram defaultProtocol | ||
588 | when (family == AF_INET6) $ do | ||
589 | setSocketOption sock IPv6Only 0 | ||
590 | setSocketOption sock Broadcast 1 | ||
591 | bind sock bind_address | ||
592 | let tr = Transport { | ||
593 | awaitMessage = \kont -> do | ||
594 | r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do | ||
595 | Just . Right <$!> B.recvFrom sock udpBufferSize | ||
596 | kont $! r | ||
597 | , sendMessage = case family of | ||
598 | AF_INET6 -> \case | ||
599 | (SockAddrInet port addr) -> \bs -> | ||
600 | -- Change IPv4 to 4mapped6 address. | ||
601 | saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 | ||
602 | addr6 -> \bs -> saferSendTo sock bs addr6 | ||
603 | AF_INET -> \case | ||
604 | (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do | ||
605 | let host4 = toBE32 raw4 | ||
606 | -- Change 4mapped6 to ordinary IPv4. | ||
607 | -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) | ||
608 | saferSendTo sock bs (SockAddrInet port host4) | ||
609 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | ||
610 | addr4 -> \bs -> saferSendTo sock bs addr4 | ||
611 | _ -> \addr bs -> saferSendTo sock bs addr | ||
612 | , closeTransport = close sock | ||
613 | } | ||
614 | return (tr, sock) | ||
615 | |||
616 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | ||
617 | chanTransport chanFromAddr self achan aclosed = Transport | ||
618 | { awaitMessage = \kont -> do | ||
619 | x <- atomically $ (Just <$> readTChan achan) | ||
620 | `orElse` | ||
621 | (readTVar aclosed >>= check >> return Nothing) | ||
622 | kont $ Right <$> x | ||
623 | , sendMessage = \them bs -> do | ||
624 | atomically $ writeTChan (chanFromAddr them) (bs,self) | ||
625 | , closeTransport = atomically $ writeTVar aclosed True | ||
626 | } | ||
627 | |||
628 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | ||
629 | testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString) | ||
630 | testPairTransport = do | ||
631 | achan <- atomically newTChan | ||
632 | bchan <- atomically newTChan | ||
633 | aclosed <- atomically $ newTVar False | ||
634 | bclosed <- atomically $ newTVar False | ||
635 | let a = SockAddrInet 1 1 | ||
636 | b = SockAddrInet 2 2 | ||
637 | return ( chanTransport (const bchan) a achan aclosed | ||
638 | , chanTransport (const achan) b bchan bclosed ) | ||