diff options
Diffstat (limited to 'dht/src')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 473 |
1 files changed, 242 insertions, 231 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index 5fcd1989..f62fbefe 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -44,13 +44,17 @@ import DPut | |||
44 | import DebugTag | 44 | import DebugTag |
45 | import Data.TableMethods | 45 | import Data.TableMethods |
46 | 46 | ||
47 | -- | An inbound packet or condition raised while monitoring a connection. | ||
48 | data Arrival err addr x | ||
49 | = Terminated -- ^ Virtual message that signals EOF. | ||
50 | | ParseError !err -- ^ A badly-formed message was recieved. | ||
51 | | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. | ||
52 | |||
47 | -- | Three methods are required to implement a datagram based query\/response protocol. | 53 | -- | Three methods are required to implement a datagram based query\/response protocol. |
48 | data TransportA err addr x y = Transport | 54 | data TransportA err addr x y = Transport |
49 | { -- | Blocks until an inbound packet is available. Returns 'Nothing' when | 55 | { -- | Blocks until an inbound packet is available. Then calls the provided |
50 | -- no more packets are expected due to a shutdown or close event. | 56 | -- continuation with the packet and origin addres or an error condition. |
51 | -- Otherwise, the packet will be parsed as type /x/ and an origin address | 57 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a |
52 | -- /addr/. Parse failure is indicated by the type 'err'. | ||
53 | awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a | ||
54 | -- | Send an /y/ packet to the given destination /addr/. | 58 | -- | Send an /y/ packet to the given destination /addr/. |
55 | , sendMessage :: addr -> y -> IO () | 59 | , sendMessage :: addr -> y -> IO () |
56 | -- | Shutdown and clean up any state related to this 'Transport'. | 60 | -- | Shutdown and clean up any state related to this 'Transport'. |
@@ -75,7 +79,12 @@ layerTransportM :: | |||
75 | -> TransportA err addr' x' y' | 79 | -> TransportA err addr' x' y' |
76 | layerTransportM parse encode tr = | 80 | layerTransportM parse encode tr = |
77 | tr { awaitMessage = \kont -> | 81 | tr { awaitMessage = \kont -> |
78 | awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join | 82 | awaitMessage tr $ \case |
83 | Terminated -> kont $ Terminated | ||
84 | ParseError e -> kont $ ParseError e | ||
85 | Arrival addr x -> parse x addr >>= \case | ||
86 | Left e -> kont $ ParseError e | ||
87 | Right (x',addr') -> kont $ Arrival addr' x' | ||
79 | , sendMessage = \addr' msg' -> do | 88 | , sendMessage = \addr' msg' -> do |
80 | (msg,addr) <- encode msg' addr' | 89 | (msg,addr) <- encode msg' addr' |
81 | sendMessage tr addr msg | 90 | sendMessage tr addr msg |
@@ -104,16 +113,6 @@ layerTransport parse encode tr = | |||
104 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | 113 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' |
105 | -- is used to share the same underlying socket, so be sure to fork a thread for | 114 | -- is used to share the same underlying socket, so be sure to fork a thread for |
106 | -- both returned 'Transport's to avoid hanging. | 115 | -- both returned 'Transport's to avoid hanging. |
107 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
108 | -> ((x,xaddr) -> Maybe (b,a)) | ||
109 | -> Transport err a b | ||
110 | -> IO (Transport err xaddr x, Transport err a b) | ||
111 | partitionTransport parse encodex tr = | ||
112 | partitionTransportM (return . parse) (return . encodex) tr | ||
113 | |||
114 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | ||
115 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
116 | -- both returned 'Transport's to avoid hanging. | ||
117 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | 116 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) |
118 | -> ((x,xaddr) -> IO (Maybe (b,a))) | 117 | -> ((x,xaddr) -> IO (Maybe (b,a))) |
119 | -> Transport err a b | 118 | -> Transport err a b |
@@ -122,22 +121,35 @@ partitionTransportM parse encodex tr = do | |||
122 | mvar <- newEmptyMVar | 121 | mvar <- newEmptyMVar |
123 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 122 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
124 | awaitMessage tr $ \m -> case m of | 123 | awaitMessage tr $ \m -> case m of |
125 | Just (Right msg) -> parse msg >>= | 124 | Arrival adr msg -> parse (msg,adr) >>= \case |
126 | either (kont . Just . Right) | 125 | Left (x,xaddr) -> kont $ Arrival xaddr x |
127 | (\y -> putMVar mvar (Just y) >> again) | 126 | Right y -> putMVar mvar (Just y) >> again |
128 | Just (Left e) -> kont $ Just (Left e) | 127 | ParseError e -> kont $ ParseError e |
129 | Nothing -> putMVar mvar Nothing >> kont Nothing | 128 | Terminated -> putMVar mvar Nothing >> kont Terminated |
130 | , sendMessage = \addr' msg' -> do | 129 | , sendMessage = \addr' msg' -> do |
131 | msg_addr <- encodex (msg',addr') | 130 | msg_addr <- encodex (msg',addr') |
132 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | 131 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr |
133 | } | 132 | } |
134 | ytr = Transport | 133 | ytr = Transport |
135 | { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right | 134 | { awaitMessage = \kont -> takeMVar mvar >>= kont . \case |
135 | Nothing -> Terminated | ||
136 | Just (y,yaddr) -> Arrival yaddr y | ||
136 | , sendMessage = sendMessage tr | 137 | , sendMessage = sendMessage tr |
137 | , closeTransport = return () | 138 | , closeTransport = return () |
138 | } | 139 | } |
139 | return (xtr, ytr) | 140 | return (xtr, ytr) |
140 | 141 | ||
142 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | ||
143 | -- is used to share the same underlying socket, so be sure to fork a thread for | ||
144 | -- both returned 'Transport's to avoid hanging. | ||
145 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | ||
146 | -> ((x,xaddr) -> Maybe (b,a)) | ||
147 | -> Transport err a b | ||
148 | -> IO (Transport err xaddr x, Transport err a b) | ||
149 | partitionTransport parse encodex tr = | ||
150 | partitionTransportM (return . parse) (return . encodex) tr | ||
151 | |||
152 | |||
141 | partitionAndForkTransport :: | 153 | partitionAndForkTransport :: |
142 | (dst -> msg -> IO ()) | 154 | (dst -> msg -> IO ()) |
143 | -> ((b,a) -> IO (Either (x,xaddr) (b,a))) | 155 | -> ((b,a) -> IO (Either (x,xaddr) (b,a))) |
@@ -147,12 +159,12 @@ partitionAndForkTransport :: | |||
147 | partitionAndForkTransport forkedSend parse encodex tr = do | 159 | partitionAndForkTransport forkedSend parse encodex tr = do |
148 | mvar <- newEmptyMVar | 160 | mvar <- newEmptyMVar |
149 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 161 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
150 | awaitMessage tr $ \m -> case m of | 162 | awaitMessage tr $ \case |
151 | Just (Right msg) -> parse msg >>= | 163 | Arrival a b -> parse (b,a) >>= \case |
152 | either (kont . Just . Right) | 164 | Left (x,xaddr) -> kont $ Arrival xaddr x |
153 | (\y -> putMVar mvar (Just y) >> again) | 165 | Right (b,a) -> putMVar mvar (Arrival a b) >> again |
154 | Just (Left e) -> kont $ Just (Left e) | 166 | ParseError e -> kont $ ParseError e |
155 | Nothing -> putMVar mvar Nothing >> kont Nothing | 167 | Terminated -> putMVar mvar Terminated >> kont Terminated |
156 | , sendMessage = \addr' msg' -> do | 168 | , sendMessage = \addr' msg' -> do |
157 | msg_addr <- encodex (msg',addr') | 169 | msg_addr <- encodex (msg',addr') |
158 | case msg_addr of | 170 | case msg_addr of |
@@ -161,7 +173,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
161 | Nothing -> return () | 173 | Nothing -> return () |
162 | } | 174 | } |
163 | ytr = Transport | 175 | ytr = Transport |
164 | { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right | 176 | { awaitMessage = \kont -> takeMVar mvar >>= kont |
165 | , sendMessage = sendMessage tr | 177 | , sendMessage = sendMessage tr |
166 | , closeTransport = return () | 178 | , closeTransport = return () |
167 | } | 179 | } |
@@ -173,11 +185,10 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
173 | -- --> Just g, apply g to x and leave that to a different handler | 185 | -- --> Just g, apply g to x and leave that to a different handler |
174 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | 186 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x |
175 | addHandler onParseError f tr = tr | 187 | addHandler onParseError f tr = tr |
176 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do | 188 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case |
177 | case m of | 189 | Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) |
178 | Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) | 190 | ParseError e -> onParseError e >> kont (ParseError e) |
179 | Just (Left e ) -> onParseError e >> kont (Just $ Left e) | 191 | Terminated -> kont Terminated |
180 | Nothing -> kont $ Nothing | ||
181 | } | 192 | } |
182 | 193 | ||
183 | -- | Modify a 'Transport' to invoke an action upon every received packet. | 194 | -- | Modify a 'Transport' to invoke an action upon every received packet. |
@@ -201,12 +212,186 @@ forkListener :: String -> Transport err addr x -> IO (IO ()) | |||
201 | forkListener name client = do | 212 | forkListener name client = do |
202 | thread_id <- forkIO $ do | 213 | thread_id <- forkIO $ do |
203 | myThreadId >>= flip labelThread ("listener."++name) | 214 | myThreadId >>= flip labelThread ("listener."++name) |
204 | fix $ \loop -> awaitMessage client $ maybe (return ()) (const loop) | 215 | fix $ \loop -> awaitMessage client $ \case |
216 | Terminated -> return () | ||
217 | _ -> loop | ||
205 | dput XMisc $ "Listener died: " ++ name | 218 | dput XMisc $ "Listener died: " ++ name |
206 | return $ do | 219 | return $ do |
207 | closeTransport client | 220 | closeTransport client |
208 | -- killThread thread_id | 221 | -- killThread thread_id |
209 | 222 | ||
223 | -- * Implementing a query\/response 'Client'. | ||
224 | |||
225 | -- | These methods indicate what should be done upon various conditions. Write | ||
226 | -- to a log file, make debug prints, or simply ignore them. | ||
227 | -- | ||
228 | -- [ /addr/ ] Address of remote peer. | ||
229 | -- | ||
230 | -- [ /x/ ] Incoming or outgoing packet. | ||
231 | -- | ||
232 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
233 | -- | ||
234 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
235 | -- | ||
236 | -- [ /err/ ] Error information, typically a 'String'. | ||
237 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
238 | { -- | Incoming: failed to parse packet. | ||
239 | reportParseError :: err -> IO () | ||
240 | -- | Incoming: no handler for request. | ||
241 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
242 | -- | Incoming: unable to identify request. | ||
243 | , reportUnknown :: addr -> x -> err -> IO () | ||
244 | } | ||
245 | |||
246 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
247 | ignoreErrors = ErrorReporter | ||
248 | { reportParseError = \_ -> return () | ||
249 | , reportMissingHandler = \_ _ _ -> return () | ||
250 | , reportUnknown = \_ _ _ -> return () | ||
251 | } | ||
252 | |||
253 | logErrors :: ( Show addr | ||
254 | , Show meth | ||
255 | ) => ErrorReporter addr x meth tid String | ||
256 | logErrors = ErrorReporter | ||
257 | { reportParseError = \err -> dput XMisc err | ||
258 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
259 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
260 | } | ||
261 | |||
262 | printErrors :: ( Show addr | ||
263 | , Show meth | ||
264 | ) => Handle -> ErrorReporter addr x meth tid String | ||
265 | printErrors h = ErrorReporter | ||
266 | { reportParseError = \err -> hPutStrLn h err | ||
267 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
268 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
269 | } | ||
270 | |||
271 | -- Change the /err/ type for an 'ErrorReporter'. | ||
272 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
273 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
274 | contramap f (ErrorReporter pe mh unk) | ||
275 | = ErrorReporter (\e -> pe (f e)) | ||
276 | mh | ||
277 | (\addr x e -> unk addr x (f e)) | ||
278 | |||
279 | -- | An incoming message can be classified into three cases. | ||
280 | data MessageClass err meth tid addr x | ||
281 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
282 | -- should include the provided /tid/ value. | ||
283 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
284 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
285 | -- with the source and destination address of a message. If it handles the | ||
286 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
287 | -- (usually /id/) to apply before the next handler examines it. | ||
288 | | IsUnknown err -- ^ None of the above. | ||
289 | |||
290 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
291 | data MethodHandler err tid addr x = forall a b. MethodHandler | ||
292 | { -- | Parse the query into a more specific type for this method. | ||
293 | methodParse :: x -> Either err a | ||
294 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
295 | -- and destination addresses. | ||
296 | , methodSerialize :: tid -> addr -> addr -> b -> x | ||
297 | -- | Fully typed action to perform upon the query. The remote origin | ||
298 | -- address of the query is provided to the handler. | ||
299 | , methodAction :: addr -> a -> IO b | ||
300 | } | ||
301 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
302 | | forall a. NoReply | ||
303 | { -- | Parse the query into a more specific type for this method. | ||
304 | methodParse :: x -> Either err a | ||
305 | -- | Fully typed action to perform upon the query. The remote origin | ||
306 | -- address of the query is provided to the handler. | ||
307 | , noreplyAction :: addr -> a -> IO () | ||
308 | } | ||
309 | |||
310 | -- | To dispatch responses to our outbound queries, we require three | ||
311 | -- primitives. See the 'transactionMethods' function to create these | ||
312 | -- primitives out of a lookup table and a generator for transaction ids. | ||
313 | -- | ||
314 | -- The type variable /d/ is used to represent the current state of the | ||
315 | -- transaction generator and the table of pending transactions. | ||
316 | data TransactionMethods d qid addr x = TransactionMethods | ||
317 | { | ||
318 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
319 | -- response will be written too. The returned /qid/ is a transaction id | ||
320 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
321 | -- responding. | ||
322 | dispatchRegister :: POSIXTime -- time of expiry | ||
323 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | ||
324 | -> addr | ||
325 | -> d | ||
326 | -> STM (qid, d) | ||
327 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
328 | -- a response to the transaction with id /qid/. The returned IO action | ||
329 | -- will write the packet to the correct 'MVar' thus completing the | ||
330 | -- dispatch. | ||
331 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | ||
332 | -- | When a timeout interval elapses, this method is called to remove the | ||
333 | -- transaction from the table. | ||
334 | , dispatchCancel :: qid -> d -> STM d | ||
335 | } | ||
336 | |||
337 | -- | A set of methods necessary for dispatching incoming packets. | ||
338 | data DispatchMethods tbl err meth tid addr x = DispatchMethods | ||
339 | { -- | Classify an inbound packet as a query or response. | ||
340 | classifyInbound :: x -> MessageClass err meth tid addr x | ||
341 | -- | Lookup the handler for a inbound query. | ||
342 | , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) | ||
343 | -- | Methods for handling incoming responses. | ||
344 | , tableMethods :: TransactionMethods tbl tid addr x | ||
345 | } | ||
346 | |||
347 | -- | All inputs required to implement a query\/response client. | ||
348 | data Client err meth tid addr x = forall tbl. Client | ||
349 | { -- | The 'Transport' used to dispatch and receive packets. | ||
350 | clientNet :: Transport err addr x | ||
351 | -- | Methods for handling inbound packets. | ||
352 | , clientDispatcher :: DispatchMethods tbl err meth tid addr x | ||
353 | -- | Methods for reporting various conditions. | ||
354 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
355 | -- | State necessary for routing inbound responses and assigning unique | ||
356 | -- /tid/ values for outgoing queries. | ||
357 | , clientPending :: TVar tbl | ||
358 | -- | An action yielding this client\'s own address. It is invoked once | ||
359 | -- on each outbound and inbound packet. It is valid for this to always | ||
360 | -- return the same value. | ||
361 | -- | ||
362 | -- The argument, if supplied, is the remote address for the transaction. | ||
363 | -- This can be used to maintain consistent aliases for specific peers. | ||
364 | , clientAddress :: Maybe addr -> IO addr | ||
365 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
366 | -- value. Normally, this would be the identity transformation, but if | ||
367 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
368 | -- generated here. | ||
369 | , clientResponseId :: tid -> IO tid | ||
370 | } | ||
371 | |||
372 | -- | These four parameters are required to implement an outgoing query. A | ||
373 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
374 | -- might be returned by 'lookupHandler'. | ||
375 | data MethodSerializer tid addr x meth a b = MethodSerializer | ||
376 | { -- | Returns the microseconds to wait for a response to this query being | ||
377 | -- sent to the given address. The /addr/ may also be modified to add | ||
378 | -- routing information. | ||
379 | methodTimeout :: addr -> STM (addr,Int) | ||
380 | -- | A method identifier used for error reporting. This needn't be the | ||
381 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
382 | , method :: meth | ||
383 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
384 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
385 | -- destination of the request. The /tid/ argument is useful for attaching | ||
386 | -- auxiliary notations on all outgoing packets. | ||
387 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
388 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
389 | , unwrapResponse :: x -> b | ||
390 | } | ||
391 | |||
392 | microsecondsDiff :: Int -> POSIXTime | ||
393 | microsecondsDiff us = fromIntegral us / 1000000 | ||
394 | |||
210 | asyncQuery_ :: Client err meth tid addr x | 395 | asyncQuery_ :: Client err meth tid addr x |
211 | -> MethodSerializer tid addr x meth a b | 396 | -> MethodSerializer tid addr x meth a b |
212 | -> a | 397 | -> a |
@@ -276,64 +461,6 @@ sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | |||
276 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | 461 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending |
277 | return Nothing | 462 | return Nothing |
278 | 463 | ||
279 | -- * Implementing a query\/response 'Client'. | ||
280 | |||
281 | -- | All inputs required to implement a query\/response client. | ||
282 | data Client err meth tid addr x = forall tbl. Client | ||
283 | { -- | The 'Transport' used to dispatch and receive packets. | ||
284 | clientNet :: Transport err addr x | ||
285 | -- | Methods for handling inbound packets. | ||
286 | , clientDispatcher :: DispatchMethods tbl err meth tid addr x | ||
287 | -- | Methods for reporting various conditions. | ||
288 | , clientErrorReporter :: ErrorReporter addr x meth tid err | ||
289 | -- | State necessary for routing inbound responses and assigning unique | ||
290 | -- /tid/ values for outgoing queries. | ||
291 | , clientPending :: TVar tbl | ||
292 | -- | An action yielding this client\'s own address. It is invoked once | ||
293 | -- on each outbound and inbound packet. It is valid for this to always | ||
294 | -- return the same value. | ||
295 | -- | ||
296 | -- The argument, if supplied, is the remote address for the transaction. | ||
297 | -- This can be used to maintain consistent aliases for specific peers. | ||
298 | , clientAddress :: Maybe addr -> IO addr | ||
299 | -- | Transform a query /tid/ value to an appropriate response /tid/ | ||
300 | -- value. Normally, this would be the identity transformation, but if | ||
301 | -- /tid/ includes a unique cryptographic nonce, then it should be | ||
302 | -- generated here. | ||
303 | , clientResponseId :: tid -> IO tid | ||
304 | } | ||
305 | |||
306 | -- | An incoming message can be classified into three cases. | ||
307 | data MessageClass err meth tid addr x | ||
308 | = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response | ||
309 | -- should include the provided /tid/ value. | ||
310 | | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | ||
311 | | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked | ||
312 | -- with the source and destination address of a message. If it handles the | ||
313 | -- message, it should return Nothing. Otherwise, it should return a transform | ||
314 | -- (usually /id/) to apply before the next handler examines it. | ||
315 | | IsUnknown err -- ^ None of the above. | ||
316 | |||
317 | -- | Handler for an inbound query of type /x/ from an address of type _addr_. | ||
318 | data MethodHandler err tid addr x = forall a b. MethodHandler | ||
319 | { -- | Parse the query into a more specific type for this method. | ||
320 | methodParse :: x -> Either err a | ||
321 | -- | Serialize the response for transmission, given a context /ctx/ and the origin | ||
322 | -- and destination addresses. | ||
323 | , methodSerialize :: tid -> addr -> addr -> b -> x | ||
324 | -- | Fully typed action to perform upon the query. The remote origin | ||
325 | -- address of the query is provided to the handler. | ||
326 | , methodAction :: addr -> a -> IO b | ||
327 | } | ||
328 | -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | ||
329 | | forall a. NoReply | ||
330 | { -- | Parse the query into a more specific type for this method. | ||
331 | methodParse :: x -> Either err a | ||
332 | -- | Fully typed action to perform upon the query. The remote origin | ||
333 | -- address of the query is provided to the handler. | ||
334 | , noreplyAction :: addr -> a -> IO () | ||
335 | } | ||
336 | |||
337 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | 464 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x |
338 | contramapAddr f (MethodHandler p s a) | 465 | contramapAddr f (MethodHandler p s a) |
339 | = MethodHandler | 466 | = MethodHandler |
@@ -343,7 +470,6 @@ contramapAddr f (MethodHandler p s a) | |||
343 | contramapAddr f (NoReply p a) | 470 | contramapAddr f (NoReply p a) |
344 | = NoReply p (\addr arg -> a (f addr) arg) | 471 | = NoReply p (\addr arg -> a (f addr) arg) |
345 | 472 | ||
346 | |||
347 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the | 473 | -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the |
348 | -- parse is successful, the returned IO action will construct our reply if | 474 | -- parse is successful, the returned IO action will construct our reply if |
349 | -- there is one. Otherwise, a parse err is returned. | 475 | -- there is one. Otherwise, a parse err is returned. |
@@ -358,65 +484,6 @@ dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = | |||
358 | dispatchQuery (NoReply unwrapQ f) tid self x addr = | 484 | dispatchQuery (NoReply unwrapQ f) tid self x addr = |
359 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x | 485 | fmap (\a -> f addr a >> return Nothing) $ unwrapQ x |
360 | 486 | ||
361 | -- | These four parameters are required to implement an outgoing query. A | ||
362 | -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that | ||
363 | -- might be returned by 'lookupHandler'. | ||
364 | data MethodSerializer tid addr x meth a b = MethodSerializer | ||
365 | { -- | Returns the microseconds to wait for a response to this query being | ||
366 | -- sent to the given address. The /addr/ may also be modified to add | ||
367 | -- routing information. | ||
368 | methodTimeout :: addr -> STM (addr,Int) | ||
369 | -- | A method identifier used for error reporting. This needn't be the | ||
370 | -- same as the /meth/ argument to 'MethodHandler', but it is suggested. | ||
371 | , method :: meth | ||
372 | -- | Serialize the outgoing query /a/ into a transmittable packet /x/. | ||
373 | -- The /addr/ arguments are, respectively, our own origin address and the | ||
374 | -- destination of the request. The /tid/ argument is useful for attaching | ||
375 | -- auxiliary notations on all outgoing packets. | ||
376 | , wrapQuery :: tid -> addr -> addr -> a -> x | ||
377 | -- | Parse an inbound packet /x/ into a response /b/ for this query. | ||
378 | , unwrapResponse :: x -> b | ||
379 | } | ||
380 | |||
381 | |||
382 | -- | To dispatch responses to our outbound queries, we require three | ||
383 | -- primitives. See the 'transactionMethods' function to create these | ||
384 | -- primitives out of a lookup table and a generator for transaction ids. | ||
385 | -- | ||
386 | -- The type variable /d/ is used to represent the current state of the | ||
387 | -- transaction generator and the table of pending transactions. | ||
388 | data TransactionMethods d qid addr x = TransactionMethods | ||
389 | { | ||
390 | -- | Before a query is sent, this function stores an 'MVar' to which the | ||
391 | -- response will be written too. The returned /qid/ is a transaction id | ||
392 | -- that can be used to forget the 'MVar' if the remote peer is not | ||
393 | -- responding. | ||
394 | dispatchRegister :: POSIXTime -- time of expiry | ||
395 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | ||
396 | -> addr | ||
397 | -> d | ||
398 | -> STM (qid, d) | ||
399 | -- | This method is invoked when an incoming packet /x/ indicates it is | ||
400 | -- a response to the transaction with id /qid/. The returned IO action | ||
401 | -- will write the packet to the correct 'MVar' thus completing the | ||
402 | -- dispatch. | ||
403 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | ||
404 | -- | When a timeout interval elapses, this method is called to remove the | ||
405 | -- transaction from the table. | ||
406 | , dispatchCancel :: qid -> d -> STM d | ||
407 | } | ||
408 | |||
409 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | ||
410 | -- function for generating unique transaction ids. | ||
411 | transactionMethods :: | ||
412 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | ||
413 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | ||
414 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | ||
415 | transactionMethods methods generate = transactionMethods' id id methods generate | ||
416 | |||
417 | microsecondsDiff :: Int -> POSIXTime | ||
418 | microsecondsDiff us = fromIntegral us / 1000000 | ||
419 | |||
420 | -- | Like 'transactionMethods' but allows extra information to be stored in the | 487 | -- | Like 'transactionMethods' but allows extra information to be stored in the |
421 | -- table of pending transactions. This also enables multiple 'Client's to | 488 | -- table of pending transactions. This also enables multiple 'Client's to |
422 | -- share a single transaction table. | 489 | -- share a single transaction table. |
@@ -439,69 +506,13 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr | |||
439 | Nothing -> return ((g,t), return ()) | 506 | Nothing -> return ((g,t), return ()) |
440 | } | 507 | } |
441 | 508 | ||
442 | -- | A set of methods necessary for dispatching incoming packets. | 509 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a |
443 | data DispatchMethods tbl err meth tid addr x = DispatchMethods | 510 | -- function for generating unique transaction ids. |
444 | { -- | Classify an inbound packet as a query or response. | 511 | transactionMethods :: |
445 | classifyInbound :: x -> MessageClass err meth tid addr x | 512 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. |
446 | -- | Lookup the handler for a inbound query. | 513 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
447 | , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) | 514 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x |
448 | -- | Methods for handling incoming responses. | 515 | transactionMethods methods generate = transactionMethods' id id methods generate |
449 | , tableMethods :: TransactionMethods tbl tid addr x | ||
450 | } | ||
451 | |||
452 | -- | These methods indicate what should be done upon various conditions. Write | ||
453 | -- to a log file, make debug prints, or simply ignore them. | ||
454 | -- | ||
455 | -- [ /addr/ ] Address of remote peer. | ||
456 | -- | ||
457 | -- [ /x/ ] Incoming or outgoing packet. | ||
458 | -- | ||
459 | -- [ /meth/ ] Method id of incoming or outgoing request. | ||
460 | -- | ||
461 | -- [ /tid/ ] Transaction id for outgoing packet. | ||
462 | -- | ||
463 | -- [ /err/ ] Error information, typically a 'String'. | ||
464 | data ErrorReporter addr x meth tid err = ErrorReporter | ||
465 | { -- | Incoming: failed to parse packet. | ||
466 | reportParseError :: err -> IO () | ||
467 | -- | Incoming: no handler for request. | ||
468 | , reportMissingHandler :: meth -> addr -> x -> IO () | ||
469 | -- | Incoming: unable to identify request. | ||
470 | , reportUnknown :: addr -> x -> err -> IO () | ||
471 | } | ||
472 | |||
473 | ignoreErrors :: ErrorReporter addr x meth tid err | ||
474 | ignoreErrors = ErrorReporter | ||
475 | { reportParseError = \_ -> return () | ||
476 | , reportMissingHandler = \_ _ _ -> return () | ||
477 | , reportUnknown = \_ _ _ -> return () | ||
478 | } | ||
479 | |||
480 | logErrors :: ( Show addr | ||
481 | , Show meth | ||
482 | ) => ErrorReporter addr x meth tid String | ||
483 | logErrors = ErrorReporter | ||
484 | { reportParseError = \err -> dput XMisc err | ||
485 | , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" | ||
486 | , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err | ||
487 | } | ||
488 | |||
489 | printErrors :: ( Show addr | ||
490 | , Show meth | ||
491 | ) => Handle -> ErrorReporter addr x meth tid String | ||
492 | printErrors h = ErrorReporter | ||
493 | { reportParseError = \err -> hPutStrLn h err | ||
494 | , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" | ||
495 | , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err | ||
496 | } | ||
497 | |||
498 | -- Change the /err/ type for an 'ErrorReporter'. | ||
499 | instance Contravariant (ErrorReporter addr x meth tid) where | ||
500 | -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 | ||
501 | contramap f (ErrorReporter pe mh unk) | ||
502 | = ErrorReporter (\e -> pe (f e)) | ||
503 | mh | ||
504 | (\addr x e -> unk addr x (f e)) | ||
505 | 516 | ||
506 | -- | Handle a single inbound packet and then invoke the given continuation. | 517 | -- | Handle a single inbound packet and then invoke the given continuation. |
507 | -- The 'forkListener' function is implemented by passing this function to 'fix' | 518 | -- The 'forkListener' function is implemented by passing this function to 'fix' |
@@ -559,14 +570,14 @@ sockAddrFamily _ = AF_CAN -- SockAddrCan constructor depre | |||
559 | -- | Packets with an empty payload may trigger EOF exception. | 570 | -- | Packets with an empty payload may trigger EOF exception. |
560 | -- 'udpTransport' uses this function to avoid throwing in that | 571 | -- 'udpTransport' uses this function to avoid throwing in that |
561 | -- case. | 572 | -- case. |
562 | ignoreEOF :: Socket -> MVar () -> a -> IOError -> IO (Maybe a) | 573 | ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) |
563 | ignoreEOF sock isClosed def e = do | 574 | ignoreEOF sock isClosed def e = do |
564 | done <- tryReadMVar isClosed | 575 | done <- tryReadMVar isClosed |
565 | case done of | 576 | case done of |
566 | Just () -> do close sock | 577 | Just () -> do close sock |
567 | dput XMisc "Closing UDP socket." | 578 | dput XMisc "Closing UDP socket." |
568 | pure Nothing | 579 | pure Terminated |
569 | _ -> if isEOFError e then pure $ Just def | 580 | _ -> if isEOFError e then pure def |
570 | else throwIO e | 581 | else throwIO e |
571 | 582 | ||
572 | -- | Hard-coded maximum packet size for incoming UDP Packets received via | 583 | -- | Hard-coded maximum packet size for incoming UDP Packets received via |
@@ -585,13 +596,6 @@ saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) | |||
585 | then return () | 596 | then return () |
586 | else throw e | 597 | else throw e |
587 | 598 | ||
588 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
589 | -- argument is the listen-address for incoming packets. This is a useful | ||
590 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
591 | -- using 'layerTransport'. | ||
592 | udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) | ||
593 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
594 | |||
595 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). | 599 | -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). |
596 | udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) | 600 | udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) |
597 | udpTransport' bind_address = do | 601 | udpTransport' bind_address = do |
@@ -604,8 +608,8 @@ udpTransport' bind_address = do | |||
604 | isClosed <- newEmptyMVar | 608 | isClosed <- newEmptyMVar |
605 | let tr = Transport { | 609 | let tr = Transport { |
606 | awaitMessage = \kont -> do | 610 | awaitMessage = \kont -> do |
607 | r <- handle (ignoreEOF sock isClosed $ Right (B.empty, SockAddrInet 0 0)) $ do | 611 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do |
608 | Just . Right <$!> B.recvFrom sock udpBufferSize | 612 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize |
609 | kont $! r | 613 | kont $! r |
610 | , sendMessage = case family of | 614 | , sendMessage = case family of |
611 | AF_INET6 -> \case | 615 | AF_INET6 -> \case |
@@ -638,13 +642,20 @@ udpTransport' bind_address = do | |||
638 | } | 642 | } |
639 | return (tr, sock) | 643 | return (tr, sock) |
640 | 644 | ||
645 | -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The | ||
646 | -- argument is the listen-address for incoming packets. This is a useful | ||
647 | -- low-level 'Transport' that can be transformed for higher-level protocols | ||
648 | -- using 'layerTransport'. | ||
649 | udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) | ||
650 | udpTransport bind_address = fst <$> udpTransport' bind_address | ||
651 | |||
641 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | 652 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x |
642 | chanTransport chanFromAddr self achan aclosed = Transport | 653 | chanTransport chanFromAddr self achan aclosed = Transport |
643 | { awaitMessage = \kont -> do | 654 | { awaitMessage = \kont -> do |
644 | x <- atomically $ (Just <$> readTChan achan) | 655 | x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) |
645 | `orElse` | 656 | `orElse` |
646 | (readTVar aclosed >>= check >> return Nothing) | 657 | (readTVar aclosed >>= check >> return Terminated) |
647 | kont $ Right <$> x | 658 | kont x |
648 | , sendMessage = \them bs -> do | 659 | , sendMessage = \them bs -> do |
649 | atomically $ writeTChan (chanFromAddr them) (bs,self) | 660 | atomically $ writeTChan (chanFromAddr them) (bs,self) |
650 | , closeTransport = atomically $ writeTVar aclosed True | 661 | , closeTransport = atomically $ writeTVar aclosed True |