summaryrefslogtreecommitdiff
path: root/dht/src
diff options
context:
space:
mode:
Diffstat (limited to 'dht/src')
-rw-r--r--dht/src/Network/QueryResponse.hs473
1 files changed, 242 insertions, 231 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
index 5fcd1989..f62fbefe 100644
--- a/dht/src/Network/QueryResponse.hs
+++ b/dht/src/Network/QueryResponse.hs
@@ -44,13 +44,17 @@ import DPut
44import DebugTag 44import DebugTag
45import Data.TableMethods 45import Data.TableMethods
46 46
47-- | An inbound packet or condition raised while monitoring a connection.
48data Arrival err addr x
49 = Terminated -- ^ Virtual message that signals EOF.
50 | ParseError !err -- ^ A badly-formed message was recieved.
51 | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message.
52
47-- | Three methods are required to implement a datagram based query\/response protocol. 53-- | Three methods are required to implement a datagram based query\/response protocol.
48data TransportA err addr x y = Transport 54data TransportA err addr x y = Transport
49 { -- | Blocks until an inbound packet is available. Returns 'Nothing' when 55 { -- | Blocks until an inbound packet is available. Then calls the provided
50 -- no more packets are expected due to a shutdown or close event. 56 -- continuation with the packet and origin addres or an error condition.
51 -- Otherwise, the packet will be parsed as type /x/ and an origin address 57 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a
52 -- /addr/. Parse failure is indicated by the type 'err'.
53 awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a
54 -- | Send an /y/ packet to the given destination /addr/. 58 -- | Send an /y/ packet to the given destination /addr/.
55 , sendMessage :: addr -> y -> IO () 59 , sendMessage :: addr -> y -> IO ()
56 -- | Shutdown and clean up any state related to this 'Transport'. 60 -- | Shutdown and clean up any state related to this 'Transport'.
@@ -75,7 +79,12 @@ layerTransportM ::
75 -> TransportA err addr' x' y' 79 -> TransportA err addr' x' y'
76layerTransportM parse encode tr = 80layerTransportM parse encode tr =
77 tr { awaitMessage = \kont -> 81 tr { awaitMessage = \kont ->
78 awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join 82 awaitMessage tr $ \case
83 Terminated -> kont $ Terminated
84 ParseError e -> kont $ ParseError e
85 Arrival addr x -> parse x addr >>= \case
86 Left e -> kont $ ParseError e
87 Right (x',addr') -> kont $ Arrival addr' x'
79 , sendMessage = \addr' msg' -> do 88 , sendMessage = \addr' msg' -> do
80 (msg,addr) <- encode msg' addr' 89 (msg,addr) <- encode msg' addr'
81 sendMessage tr addr msg 90 sendMessage tr addr msg
@@ -104,16 +113,6 @@ layerTransport parse encode tr =
104-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' 113-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar'
105-- is used to share the same underlying socket, so be sure to fork a thread for 114-- is used to share the same underlying socket, so be sure to fork a thread for
106-- both returned 'Transport's to avoid hanging. 115-- both returned 'Transport's to avoid hanging.
107partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
108 -> ((x,xaddr) -> Maybe (b,a))
109 -> Transport err a b
110 -> IO (Transport err xaddr x, Transport err a b)
111partitionTransport parse encodex tr =
112 partitionTransportM (return . parse) (return . encodex) tr
113
114-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar'
115-- is used to share the same underlying socket, so be sure to fork a thread for
116-- both returned 'Transport's to avoid hanging.
117partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) 116partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
118 -> ((x,xaddr) -> IO (Maybe (b,a))) 117 -> ((x,xaddr) -> IO (Maybe (b,a)))
119 -> Transport err a b 118 -> Transport err a b
@@ -122,22 +121,35 @@ partitionTransportM parse encodex tr = do
122 mvar <- newEmptyMVar 121 mvar <- newEmptyMVar
123 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 122 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
124 awaitMessage tr $ \m -> case m of 123 awaitMessage tr $ \m -> case m of
125 Just (Right msg) -> parse msg >>= 124 Arrival adr msg -> parse (msg,adr) >>= \case
126 either (kont . Just . Right) 125 Left (x,xaddr) -> kont $ Arrival xaddr x
127 (\y -> putMVar mvar (Just y) >> again) 126 Right y -> putMVar mvar (Just y) >> again
128 Just (Left e) -> kont $ Just (Left e) 127 ParseError e -> kont $ ParseError e
129 Nothing -> putMVar mvar Nothing >> kont Nothing 128 Terminated -> putMVar mvar Nothing >> kont Terminated
130 , sendMessage = \addr' msg' -> do 129 , sendMessage = \addr' msg' -> do
131 msg_addr <- encodex (msg',addr') 130 msg_addr <- encodex (msg',addr')
132 mapM_ (uncurry . flip $ sendMessage tr) msg_addr 131 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
133 } 132 }
134 ytr = Transport 133 ytr = Transport
135 { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right 134 { awaitMessage = \kont -> takeMVar mvar >>= kont . \case
135 Nothing -> Terminated
136 Just (y,yaddr) -> Arrival yaddr y
136 , sendMessage = sendMessage tr 137 , sendMessage = sendMessage tr
137 , closeTransport = return () 138 , closeTransport = return ()
138 } 139 }
139 return (xtr, ytr) 140 return (xtr, ytr)
140 141
142-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar'
143-- is used to share the same underlying socket, so be sure to fork a thread for
144-- both returned 'Transport's to avoid hanging.
145partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
146 -> ((x,xaddr) -> Maybe (b,a))
147 -> Transport err a b
148 -> IO (Transport err xaddr x, Transport err a b)
149partitionTransport parse encodex tr =
150 partitionTransportM (return . parse) (return . encodex) tr
151
152
141partitionAndForkTransport :: 153partitionAndForkTransport ::
142 (dst -> msg -> IO ()) 154 (dst -> msg -> IO ())
143 -> ((b,a) -> IO (Either (x,xaddr) (b,a))) 155 -> ((b,a) -> IO (Either (x,xaddr) (b,a)))
@@ -147,12 +159,12 @@ partitionAndForkTransport ::
147partitionAndForkTransport forkedSend parse encodex tr = do 159partitionAndForkTransport forkedSend parse encodex tr = do
148 mvar <- newEmptyMVar 160 mvar <- newEmptyMVar
149 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 161 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
150 awaitMessage tr $ \m -> case m of 162 awaitMessage tr $ \case
151 Just (Right msg) -> parse msg >>= 163 Arrival a b -> parse (b,a) >>= \case
152 either (kont . Just . Right) 164 Left (x,xaddr) -> kont $ Arrival xaddr x
153 (\y -> putMVar mvar (Just y) >> again) 165 Right (b,a) -> putMVar mvar (Arrival a b) >> again
154 Just (Left e) -> kont $ Just (Left e) 166 ParseError e -> kont $ ParseError e
155 Nothing -> putMVar mvar Nothing >> kont Nothing 167 Terminated -> putMVar mvar Terminated >> kont Terminated
156 , sendMessage = \addr' msg' -> do 168 , sendMessage = \addr' msg' -> do
157 msg_addr <- encodex (msg',addr') 169 msg_addr <- encodex (msg',addr')
158 case msg_addr of 170 case msg_addr of
@@ -161,7 +173,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do
161 Nothing -> return () 173 Nothing -> return ()
162 } 174 }
163 ytr = Transport 175 ytr = Transport
164 { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right 176 { awaitMessage = \kont -> takeMVar mvar >>= kont
165 , sendMessage = sendMessage tr 177 , sendMessage = sendMessage tr
166 , closeTransport = return () 178 , closeTransport = return ()
167 } 179 }
@@ -173,11 +185,10 @@ partitionAndForkTransport forkedSend parse encodex tr = do
173-- --> Just g, apply g to x and leave that to a different handler 185-- --> Just g, apply g to x and leave that to a different handler
174addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x 186addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x
175addHandler onParseError f tr = tr 187addHandler onParseError f tr = tr
176 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do 188 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
177 case m of 189 Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x))
178 Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) 190 ParseError e -> onParseError e >> kont (ParseError e)
179 Just (Left e ) -> onParseError e >> kont (Just $ Left e) 191 Terminated -> kont Terminated
180 Nothing -> kont $ Nothing
181 } 192 }
182 193
183-- | Modify a 'Transport' to invoke an action upon every received packet. 194-- | Modify a 'Transport' to invoke an action upon every received packet.
@@ -201,12 +212,186 @@ forkListener :: String -> Transport err addr x -> IO (IO ())
201forkListener name client = do 212forkListener name client = do
202 thread_id <- forkIO $ do 213 thread_id <- forkIO $ do
203 myThreadId >>= flip labelThread ("listener."++name) 214 myThreadId >>= flip labelThread ("listener."++name)
204 fix $ \loop -> awaitMessage client $ maybe (return ()) (const loop) 215 fix $ \loop -> awaitMessage client $ \case
216 Terminated -> return ()
217 _ -> loop
205 dput XMisc $ "Listener died: " ++ name 218 dput XMisc $ "Listener died: " ++ name
206 return $ do 219 return $ do
207 closeTransport client 220 closeTransport client
208 -- killThread thread_id 221 -- killThread thread_id
209 222
223-- * Implementing a query\/response 'Client'.
224
225-- | These methods indicate what should be done upon various conditions. Write
226-- to a log file, make debug prints, or simply ignore them.
227--
228-- [ /addr/ ] Address of remote peer.
229--
230-- [ /x/ ] Incoming or outgoing packet.
231--
232-- [ /meth/ ] Method id of incoming or outgoing request.
233--
234-- [ /tid/ ] Transaction id for outgoing packet.
235--
236-- [ /err/ ] Error information, typically a 'String'.
237data ErrorReporter addr x meth tid err = ErrorReporter
238 { -- | Incoming: failed to parse packet.
239 reportParseError :: err -> IO ()
240 -- | Incoming: no handler for request.
241 , reportMissingHandler :: meth -> addr -> x -> IO ()
242 -- | Incoming: unable to identify request.
243 , reportUnknown :: addr -> x -> err -> IO ()
244 }
245
246ignoreErrors :: ErrorReporter addr x meth tid err
247ignoreErrors = ErrorReporter
248 { reportParseError = \_ -> return ()
249 , reportMissingHandler = \_ _ _ -> return ()
250 , reportUnknown = \_ _ _ -> return ()
251 }
252
253logErrors :: ( Show addr
254 , Show meth
255 ) => ErrorReporter addr x meth tid String
256logErrors = ErrorReporter
257 { reportParseError = \err -> dput XMisc err
258 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
259 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
260 }
261
262printErrors :: ( Show addr
263 , Show meth
264 ) => Handle -> ErrorReporter addr x meth tid String
265printErrors h = ErrorReporter
266 { reportParseError = \err -> hPutStrLn h err
267 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
268 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
269 }
270
271-- Change the /err/ type for an 'ErrorReporter'.
272instance Contravariant (ErrorReporter addr x meth tid) where
273 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
274 contramap f (ErrorReporter pe mh unk)
275 = ErrorReporter (\e -> pe (f e))
276 mh
277 (\addr x e -> unk addr x (f e))
278
279-- | An incoming message can be classified into three cases.
280data MessageClass err meth tid addr x
281 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
282 -- should include the provided /tid/ value.
283 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
284 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
285 -- with the source and destination address of a message. If it handles the
286 -- message, it should return Nothing. Otherwise, it should return a transform
287 -- (usually /id/) to apply before the next handler examines it.
288 | IsUnknown err -- ^ None of the above.
289
290-- | Handler for an inbound query of type /x/ from an address of type _addr_.
291data MethodHandler err tid addr x = forall a b. MethodHandler
292 { -- | Parse the query into a more specific type for this method.
293 methodParse :: x -> Either err a
294 -- | Serialize the response for transmission, given a context /ctx/ and the origin
295 -- and destination addresses.
296 , methodSerialize :: tid -> addr -> addr -> b -> x
297 -- | Fully typed action to perform upon the query. The remote origin
298 -- address of the query is provided to the handler.
299 , methodAction :: addr -> a -> IO b
300 }
301 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
302 | forall a. NoReply
303 { -- | Parse the query into a more specific type for this method.
304 methodParse :: x -> Either err a
305 -- | Fully typed action to perform upon the query. The remote origin
306 -- address of the query is provided to the handler.
307 , noreplyAction :: addr -> a -> IO ()
308 }
309
310-- | To dispatch responses to our outbound queries, we require three
311-- primitives. See the 'transactionMethods' function to create these
312-- primitives out of a lookup table and a generator for transaction ids.
313--
314-- The type variable /d/ is used to represent the current state of the
315-- transaction generator and the table of pending transactions.
316data TransactionMethods d qid addr x = TransactionMethods
317 {
318 -- | Before a query is sent, this function stores an 'MVar' to which the
319 -- response will be written too. The returned /qid/ is a transaction id
320 -- that can be used to forget the 'MVar' if the remote peer is not
321 -- responding.
322 dispatchRegister :: POSIXTime -- time of expiry
323 -> (Maybe x -> IO ()) -- callback upon response (or timeout)
324 -> addr
325 -> d
326 -> STM (qid, d)
327 -- | This method is invoked when an incoming packet /x/ indicates it is
328 -- a response to the transaction with id /qid/. The returned IO action
329 -- will write the packet to the correct 'MVar' thus completing the
330 -- dispatch.
331 , dispatchResponse :: qid -> x -> d -> STM (d, IO ())
332 -- | When a timeout interval elapses, this method is called to remove the
333 -- transaction from the table.
334 , dispatchCancel :: qid -> d -> STM d
335 }
336
337-- | A set of methods necessary for dispatching incoming packets.
338data DispatchMethods tbl err meth tid addr x = DispatchMethods
339 { -- | Classify an inbound packet as a query or response.
340 classifyInbound :: x -> MessageClass err meth tid addr x
341 -- | Lookup the handler for a inbound query.
342 , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x)
343 -- | Methods for handling incoming responses.
344 , tableMethods :: TransactionMethods tbl tid addr x
345 }
346
347-- | All inputs required to implement a query\/response client.
348data Client err meth tid addr x = forall tbl. Client
349 { -- | The 'Transport' used to dispatch and receive packets.
350 clientNet :: Transport err addr x
351 -- | Methods for handling inbound packets.
352 , clientDispatcher :: DispatchMethods tbl err meth tid addr x
353 -- | Methods for reporting various conditions.
354 , clientErrorReporter :: ErrorReporter addr x meth tid err
355 -- | State necessary for routing inbound responses and assigning unique
356 -- /tid/ values for outgoing queries.
357 , clientPending :: TVar tbl
358 -- | An action yielding this client\'s own address. It is invoked once
359 -- on each outbound and inbound packet. It is valid for this to always
360 -- return the same value.
361 --
362 -- The argument, if supplied, is the remote address for the transaction.
363 -- This can be used to maintain consistent aliases for specific peers.
364 , clientAddress :: Maybe addr -> IO addr
365 -- | Transform a query /tid/ value to an appropriate response /tid/
366 -- value. Normally, this would be the identity transformation, but if
367 -- /tid/ includes a unique cryptographic nonce, then it should be
368 -- generated here.
369 , clientResponseId :: tid -> IO tid
370 }
371
372-- | These four parameters are required to implement an outgoing query. A
373-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
374-- might be returned by 'lookupHandler'.
375data MethodSerializer tid addr x meth a b = MethodSerializer
376 { -- | Returns the microseconds to wait for a response to this query being
377 -- sent to the given address. The /addr/ may also be modified to add
378 -- routing information.
379 methodTimeout :: addr -> STM (addr,Int)
380 -- | A method identifier used for error reporting. This needn't be the
381 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
382 , method :: meth
383 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
384 -- The /addr/ arguments are, respectively, our own origin address and the
385 -- destination of the request. The /tid/ argument is useful for attaching
386 -- auxiliary notations on all outgoing packets.
387 , wrapQuery :: tid -> addr -> addr -> a -> x
388 -- | Parse an inbound packet /x/ into a response /b/ for this query.
389 , unwrapResponse :: x -> b
390 }
391
392microsecondsDiff :: Int -> POSIXTime
393microsecondsDiff us = fromIntegral us / 1000000
394
210asyncQuery_ :: Client err meth tid addr x 395asyncQuery_ :: Client err meth tid addr x
211 -> MethodSerializer tid addr x meth a b 396 -> MethodSerializer tid addr x meth a b
212 -> a 397 -> a
@@ -276,64 +461,6 @@ sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
276 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending 461 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
277 return Nothing 462 return Nothing
278 463
279-- * Implementing a query\/response 'Client'.
280
281-- | All inputs required to implement a query\/response client.
282data Client err meth tid addr x = forall tbl. Client
283 { -- | The 'Transport' used to dispatch and receive packets.
284 clientNet :: Transport err addr x
285 -- | Methods for handling inbound packets.
286 , clientDispatcher :: DispatchMethods tbl err meth tid addr x
287 -- | Methods for reporting various conditions.
288 , clientErrorReporter :: ErrorReporter addr x meth tid err
289 -- | State necessary for routing inbound responses and assigning unique
290 -- /tid/ values for outgoing queries.
291 , clientPending :: TVar tbl
292 -- | An action yielding this client\'s own address. It is invoked once
293 -- on each outbound and inbound packet. It is valid for this to always
294 -- return the same value.
295 --
296 -- The argument, if supplied, is the remote address for the transaction.
297 -- This can be used to maintain consistent aliases for specific peers.
298 , clientAddress :: Maybe addr -> IO addr
299 -- | Transform a query /tid/ value to an appropriate response /tid/
300 -- value. Normally, this would be the identity transformation, but if
301 -- /tid/ includes a unique cryptographic nonce, then it should be
302 -- generated here.
303 , clientResponseId :: tid -> IO tid
304 }
305
306-- | An incoming message can be classified into three cases.
307data MessageClass err meth tid addr x
308 = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response
309 -- should include the provided /tid/ value.
310 | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value.
311 | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked
312 -- with the source and destination address of a message. If it handles the
313 -- message, it should return Nothing. Otherwise, it should return a transform
314 -- (usually /id/) to apply before the next handler examines it.
315 | IsUnknown err -- ^ None of the above.
316
317-- | Handler for an inbound query of type /x/ from an address of type _addr_.
318data MethodHandler err tid addr x = forall a b. MethodHandler
319 { -- | Parse the query into a more specific type for this method.
320 methodParse :: x -> Either err a
321 -- | Serialize the response for transmission, given a context /ctx/ and the origin
322 -- and destination addresses.
323 , methodSerialize :: tid -> addr -> addr -> b -> x
324 -- | Fully typed action to perform upon the query. The remote origin
325 -- address of the query is provided to the handler.
326 , methodAction :: addr -> a -> IO b
327 }
328 -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary.
329 | forall a. NoReply
330 { -- | Parse the query into a more specific type for this method.
331 methodParse :: x -> Either err a
332 -- | Fully typed action to perform upon the query. The remote origin
333 -- address of the query is provided to the handler.
334 , noreplyAction :: addr -> a -> IO ()
335 }
336
337contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x 464contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
338contramapAddr f (MethodHandler p s a) 465contramapAddr f (MethodHandler p s a)
339 = MethodHandler 466 = MethodHandler
@@ -343,7 +470,6 @@ contramapAddr f (MethodHandler p s a)
343contramapAddr f (NoReply p a) 470contramapAddr f (NoReply p a)
344 = NoReply p (\addr arg -> a (f addr) arg) 471 = NoReply p (\addr arg -> a (f addr) arg)
345 472
346
347-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the 473-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the
348-- parse is successful, the returned IO action will construct our reply if 474-- parse is successful, the returned IO action will construct our reply if
349-- there is one. Otherwise, a parse err is returned. 475-- there is one. Otherwise, a parse err is returned.
@@ -358,65 +484,6 @@ dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr =
358dispatchQuery (NoReply unwrapQ f) tid self x addr = 484dispatchQuery (NoReply unwrapQ f) tid self x addr =
359 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x 485 fmap (\a -> f addr a >> return Nothing) $ unwrapQ x
360 486
361-- | These four parameters are required to implement an outgoing query. A
362-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that
363-- might be returned by 'lookupHandler'.
364data MethodSerializer tid addr x meth a b = MethodSerializer
365 { -- | Returns the microseconds to wait for a response to this query being
366 -- sent to the given address. The /addr/ may also be modified to add
367 -- routing information.
368 methodTimeout :: addr -> STM (addr,Int)
369 -- | A method identifier used for error reporting. This needn't be the
370 -- same as the /meth/ argument to 'MethodHandler', but it is suggested.
371 , method :: meth
372 -- | Serialize the outgoing query /a/ into a transmittable packet /x/.
373 -- The /addr/ arguments are, respectively, our own origin address and the
374 -- destination of the request. The /tid/ argument is useful for attaching
375 -- auxiliary notations on all outgoing packets.
376 , wrapQuery :: tid -> addr -> addr -> a -> x
377 -- | Parse an inbound packet /x/ into a response /b/ for this query.
378 , unwrapResponse :: x -> b
379 }
380
381
382-- | To dispatch responses to our outbound queries, we require three
383-- primitives. See the 'transactionMethods' function to create these
384-- primitives out of a lookup table and a generator for transaction ids.
385--
386-- The type variable /d/ is used to represent the current state of the
387-- transaction generator and the table of pending transactions.
388data TransactionMethods d qid addr x = TransactionMethods
389 {
390 -- | Before a query is sent, this function stores an 'MVar' to which the
391 -- response will be written too. The returned /qid/ is a transaction id
392 -- that can be used to forget the 'MVar' if the remote peer is not
393 -- responding.
394 dispatchRegister :: POSIXTime -- time of expiry
395 -> (Maybe x -> IO ()) -- callback upon response (or timeout)
396 -> addr
397 -> d
398 -> STM (qid, d)
399 -- | This method is invoked when an incoming packet /x/ indicates it is
400 -- a response to the transaction with id /qid/. The returned IO action
401 -- will write the packet to the correct 'MVar' thus completing the
402 -- dispatch.
403 , dispatchResponse :: qid -> x -> d -> STM (d, IO ())
404 -- | When a timeout interval elapses, this method is called to remove the
405 -- transaction from the table.
406 , dispatchCancel :: qid -> d -> STM d
407 }
408
409-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
410-- function for generating unique transaction ids.
411transactionMethods ::
412 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
413 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
414 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
415transactionMethods methods generate = transactionMethods' id id methods generate
416
417microsecondsDiff :: Int -> POSIXTime
418microsecondsDiff us = fromIntegral us / 1000000
419
420-- | Like 'transactionMethods' but allows extra information to be stored in the 487-- | Like 'transactionMethods' but allows extra information to be stored in the
421-- table of pending transactions. This also enables multiple 'Client's to 488-- table of pending transactions. This also enables multiple 'Client's to
422-- share a single transaction table. 489-- share a single transaction table.
@@ -439,69 +506,13 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr
439 Nothing -> return ((g,t), return ()) 506 Nothing -> return ((g,t), return ())
440 } 507 }
441 508
442-- | A set of methods necessary for dispatching incoming packets. 509-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
443data DispatchMethods tbl err meth tid addr x = DispatchMethods 510-- function for generating unique transaction ids.
444 { -- | Classify an inbound packet as a query or response. 511transactionMethods ::
445 classifyInbound :: x -> MessageClass err meth tid addr x 512 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
446 -- | Lookup the handler for a inbound query. 513 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
447 , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) 514 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
448 -- | Methods for handling incoming responses. 515transactionMethods methods generate = transactionMethods' id id methods generate
449 , tableMethods :: TransactionMethods tbl tid addr x
450 }
451
452-- | These methods indicate what should be done upon various conditions. Write
453-- to a log file, make debug prints, or simply ignore them.
454--
455-- [ /addr/ ] Address of remote peer.
456--
457-- [ /x/ ] Incoming or outgoing packet.
458--
459-- [ /meth/ ] Method id of incoming or outgoing request.
460--
461-- [ /tid/ ] Transaction id for outgoing packet.
462--
463-- [ /err/ ] Error information, typically a 'String'.
464data ErrorReporter addr x meth tid err = ErrorReporter
465 { -- | Incoming: failed to parse packet.
466 reportParseError :: err -> IO ()
467 -- | Incoming: no handler for request.
468 , reportMissingHandler :: meth -> addr -> x -> IO ()
469 -- | Incoming: unable to identify request.
470 , reportUnknown :: addr -> x -> err -> IO ()
471 }
472
473ignoreErrors :: ErrorReporter addr x meth tid err
474ignoreErrors = ErrorReporter
475 { reportParseError = \_ -> return ()
476 , reportMissingHandler = \_ _ _ -> return ()
477 , reportUnknown = \_ _ _ -> return ()
478 }
479
480logErrors :: ( Show addr
481 , Show meth
482 ) => ErrorReporter addr x meth tid String
483logErrors = ErrorReporter
484 { reportParseError = \err -> dput XMisc err
485 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
486 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
487 }
488
489printErrors :: ( Show addr
490 , Show meth
491 ) => Handle -> ErrorReporter addr x meth tid String
492printErrors h = ErrorReporter
493 { reportParseError = \err -> hPutStrLn h err
494 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
495 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
496 }
497
498-- Change the /err/ type for an 'ErrorReporter'.
499instance Contravariant (ErrorReporter addr x meth tid) where
500 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
501 contramap f (ErrorReporter pe mh unk)
502 = ErrorReporter (\e -> pe (f e))
503 mh
504 (\addr x e -> unk addr x (f e))
505 516
506-- | Handle a single inbound packet and then invoke the given continuation. 517-- | Handle a single inbound packet and then invoke the given continuation.
507-- The 'forkListener' function is implemented by passing this function to 'fix' 518-- The 'forkListener' function is implemented by passing this function to 'fix'
@@ -559,14 +570,14 @@ sockAddrFamily _ = AF_CAN -- SockAddrCan constructor depre
559-- | Packets with an empty payload may trigger EOF exception. 570-- | Packets with an empty payload may trigger EOF exception.
560-- 'udpTransport' uses this function to avoid throwing in that 571-- 'udpTransport' uses this function to avoid throwing in that
561-- case. 572-- case.
562ignoreEOF :: Socket -> MVar () -> a -> IOError -> IO (Maybe a) 573ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x)
563ignoreEOF sock isClosed def e = do 574ignoreEOF sock isClosed def e = do
564 done <- tryReadMVar isClosed 575 done <- tryReadMVar isClosed
565 case done of 576 case done of
566 Just () -> do close sock 577 Just () -> do close sock
567 dput XMisc "Closing UDP socket." 578 dput XMisc "Closing UDP socket."
568 pure Nothing 579 pure Terminated
569 _ -> if isEOFError e then pure $ Just def 580 _ -> if isEOFError e then pure def
570 else throwIO e 581 else throwIO e
571 582
572-- | Hard-coded maximum packet size for incoming UDP Packets received via 583-- | Hard-coded maximum packet size for incoming UDP Packets received via
@@ -585,13 +596,6 @@ saferSendTo sock bs saddr = void (B.sendTo sock bs saddr)
585 then return () 596 then return ()
586 else throw e 597 else throw e
587 598
588-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
589-- argument is the listen-address for incoming packets. This is a useful
590-- low-level 'Transport' that can be transformed for higher-level protocols
591-- using 'layerTransport'.
592udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString)
593udpTransport bind_address = fst <$> udpTransport' bind_address
594
595-- | Like 'udpTransport' except also returns the raw socket (for broadcast use). 599-- | Like 'udpTransport' except also returns the raw socket (for broadcast use).
596udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) 600udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket)
597udpTransport' bind_address = do 601udpTransport' bind_address = do
@@ -604,8 +608,8 @@ udpTransport' bind_address = do
604 isClosed <- newEmptyMVar 608 isClosed <- newEmptyMVar
605 let tr = Transport { 609 let tr = Transport {
606 awaitMessage = \kont -> do 610 awaitMessage = \kont -> do
607 r <- handle (ignoreEOF sock isClosed $ Right (B.empty, SockAddrInet 0 0)) $ do 611 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
608 Just . Right <$!> B.recvFrom sock udpBufferSize 612 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
609 kont $! r 613 kont $! r
610 , sendMessage = case family of 614 , sendMessage = case family of
611 AF_INET6 -> \case 615 AF_INET6 -> \case
@@ -638,13 +642,20 @@ udpTransport' bind_address = do
638 } 642 }
639 return (tr, sock) 643 return (tr, sock)
640 644
645-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The
646-- argument is the listen-address for incoming packets. This is a useful
647-- low-level 'Transport' that can be transformed for higher-level protocols
648-- using 'layerTransport'.
649udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString)
650udpTransport bind_address = fst <$> udpTransport' bind_address
651
641chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x 652chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
642chanTransport chanFromAddr self achan aclosed = Transport 653chanTransport chanFromAddr self achan aclosed = Transport
643 { awaitMessage = \kont -> do 654 { awaitMessage = \kont -> do
644 x <- atomically $ (Just <$> readTChan achan) 655 x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan)
645 `orElse` 656 `orElse`
646 (readTVar aclosed >>= check >> return Nothing) 657 (readTVar aclosed >>= check >> return Terminated)
647 kont $ Right <$> x 658 kont x
648 , sendMessage = \them bs -> do 659 , sendMessage = \them bs -> do
649 atomically $ writeTChan (chanFromAddr them) (bs,self) 660 atomically $ writeTChan (chanFromAddr them) (bs,self)
650 , closeTransport = atomically $ writeTVar aclosed True 661 , closeTransport = atomically $ writeTVar aclosed True