summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse.hs
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-01-16 21:50:19 -0500
committerJoe Crayne <joe@jerkface.net>2019-01-16 21:50:19 -0500
commitb5df06bf0fed5a30a9b16e1032037e6cea378464 (patch)
tree4cba15d7523f45911ec5682ac05c25fe6c5e6487 /src/Network/QueryResponse.hs
parentf9339cd18bceba3f5000f1d2ccd9ce7dbc5f2cb0 (diff)
Queries table: Switched MVar with callback.
Diffstat (limited to 'src/Network/QueryResponse.hs')
-rw-r--r--src/Network/QueryResponse.hs160
1 files changed, 70 insertions, 90 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs
index 13160d31..01981cc8 100644
--- a/src/Network/QueryResponse.hs
+++ b/src/Network/QueryResponse.hs
@@ -27,10 +27,12 @@ import qualified Data.IntMap.Strict as IntMap
27 ;import Data.IntMap.Strict (IntMap) 27 ;import Data.IntMap.Strict (IntMap)
28import qualified Data.Map.Strict as Map 28import qualified Data.Map.Strict as Map
29 ;import Data.Map.Strict (Map) 29 ;import Data.Map.Strict (Map)
30import Data.Time.Clock.POSIX
30import qualified Data.Word64Map as W64Map 31import qualified Data.Word64Map as W64Map
31 ;import Data.Word64Map (Word64Map) 32 ;import Data.Word64Map (Word64Map)
32import Data.Word 33import Data.Word
33import Data.Maybe 34import Data.Maybe
35import GHC.Event
34import Network.Socket 36import Network.Socket
35import Network.Socket.ByteString as B 37import Network.Socket.ByteString as B
36import System.Endian 38import System.Endian
@@ -39,6 +41,7 @@ import System.IO.Error
39import System.Timeout 41import System.Timeout
40import DPut 42import DPut
41import DebugTag 43import DebugTag
44import Data.TableMethods
42 45
43-- | Three methods are required to implement a datagram based query\/response protocol. 46-- | Three methods are required to implement a datagram based query\/response protocol.
44data TransportA err addr x y = Transport 47data TransportA err addr x y = Transport
@@ -203,6 +206,52 @@ forkListener name client = do
203 closeTransport client 206 closeTransport client
204 killThread thread_id 207 killThread thread_id
205 208
209asyncQuery_ :: Client err meth tid addr x
210 -> MethodSerializer tid addr x meth a b
211 -> a
212 -> addr
213 -> (Maybe b -> IO ())
214 -> IO (tid,POSIXTime,Int)
215asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do
216 now <- getPOSIXTime
217 (tid,addr,expiry) <- atomically $ do
218 tbl <- readTVar pending
219 ((tid,addr,expiry), tbl') <- dispatchRegister (tableMethods d)
220 (methodTimeout meth)
221 now
222 (withResponse . fmap (unwrapResponse meth))
223 addr0
224 tbl
225 -- (addr,expiry) <- methodTimeout meth tid addr0
226 writeTVar pending tbl'
227 return (tid,addr,expiry)
228 self <- whoami (Just addr)
229 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
230 return $ Just ()
231 `catchIOError` (\e -> return Nothing)
232 return (tid,now,expiry)
233
234asyncQuery :: Client err meth tid addr x
235 -> MethodSerializer tid addr x meth a b
236 -> a
237 -> addr
238 -> (Maybe b -> IO ())
239 -> IO ()
240asyncQuery client meth q addr withResponse0 = do
241 tm <- getSystemTimerManager
242 tidvar <- newEmptyMVar
243 timedout <- registerTimeout tm 300000000 $ do
244 withResponse0 Nothing
245 tid <- takeMVar tidvar
246 case client of
247 Client { clientDispatcher = d, clientPending = pending } -> do
248 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
249 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
250 unregisterTimeout tm timedout
251 withResponse0 x
252 putMVar tidvar tid
253 updateTimeout tm timedout expiry
254
206-- | Send a query to a remote peer. Note that this function will always time 255-- | Send a query to a remote peer. Note that this function will always time
207-- out if 'forkListener' was never invoked to spawn a thread to receive and 256-- out if 'forkListener' was never invoked to spawn a thread to receive and
208-- dispatch the response. 257-- dispatch the response.
@@ -213,25 +262,14 @@ sendQuery ::
213 -> a -- ^ The outbound query. 262 -> a -- ^ The outbound query.
214 -> addr -- ^ Destination address of query. 263 -> addr -- ^ Destination address of query.
215 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. 264 -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out.
216sendQuery (Client net d err pending whoami _ enterQuery leaveQuery) meth q addr0 = do 265sendQuery c@(Client net d err pending whoami _) meth q addr0 = do
217 mvar <- newEmptyMVar 266 mvar <- newEmptyMVar
218 (tid,addr,expiry) <- atomically $ do 267 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar)
219 tbl <- readTVar pending 268 mres <- timeout expiry $ takeMVar mvar
220 (tid, tbl') <- dispatchRegister (tableMethods d) mvar addr0 tbl
221 (addr,expiry) <- methodTimeout meth tid addr0
222 writeTVar pending tbl'
223 return (tid,addr,expiry)
224 self <- whoami (Just addr)
225 enterQuery tid
226 mres <- do sendMessage net addr (wrapQuery meth tid self addr q)
227 timeout expiry $ takeMVar mvar
228 `catchIOError` (\e -> return Nothing)
229 leaveQuery tid (isJust mres)
230 case mres of 269 case mres of
231 Just x -> return $ Just $ unwrapResponse meth x 270 Just b -> return $ Just b
232 Nothing -> do 271 Nothing -> do
233 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending 272 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
234 reportTimeout err (method meth) tid addr
235 return Nothing 273 return Nothing
236 274
237-- * Implementing a query\/response 'Client'. 275-- * Implementing a query\/response 'Client'.
@@ -259,10 +297,6 @@ data Client err meth tid addr x = forall tbl. Client
259 -- /tid/ includes a unique cryptographic nonce, then it should be 297 -- /tid/ includes a unique cryptographic nonce, then it should be
260 -- generated here. 298 -- generated here.
261 , clientResponseId :: tid -> IO tid 299 , clientResponseId :: tid -> IO tid
262 -- | The enter/leave methods are no-ops by default. They are useful for
263 -- serializing all queries for debugging purposes.
264 , clientEnterQuery :: tid -> IO ()
265 , clientLeaveQuery :: tid -> Bool -> IO ()
266 } 300 }
267 301
268-- | An incoming message can be classified into three cases. 302-- | An incoming message can be classified into three cases.
@@ -353,7 +387,7 @@ data TransactionMethods d tid addr x = TransactionMethods
353 -- response will be written too. The returned /tid/ is a transaction id 387 -- response will be written too. The returned /tid/ is a transaction id
354 -- that can be used to forget the 'MVar' if the remote peer is not 388 -- that can be used to forget the 'MVar' if the remote peer is not
355 -- responding. 389 -- responding.
356 dispatchRegister :: MVar x -> addr -> d -> STM (tid, d) 390 dispatchRegister :: (tid -> addr -> STM (addr,Int)) -> POSIXTime -> (Maybe x -> IO ()) -> addr -> d -> STM ((tid,addr,Int), d)
357 -- | This method is invoked when an incoming packet /x/ indicates it is 391 -- | This method is invoked when an incoming packet /x/ indicates it is
358 -- a response to the transaction with id /tid/. The returned IO action 392 -- a response to the transaction with id /tid/. The returned IO action
359 -- will write the packet to the correct 'MVar' thus completing the 393 -- will write the packet to the correct 'MVar' thus completing the
@@ -364,69 +398,37 @@ data TransactionMethods d tid addr x = TransactionMethods
364 , dispatchCancel :: tid -> d -> STM d 398 , dispatchCancel :: tid -> d -> STM d
365 } 399 }
366 400
367-- | The standard lookup table methods for use as input to 'transactionMethods'
368-- in lieu of directly implementing 'TransactionMethods'.
369data TableMethods t tid = TableMethods
370 { -- | Insert a new /tid/ entry into the transaction table.
371 tblInsert :: forall a. tid -> a -> t a -> t a
372 -- | Delete transaction /tid/ from the transaction table.
373 , tblDelete :: forall a. tid -> t a -> t a
374 -- | Lookup the value associated with transaction /tid/.
375 , tblLookup :: forall a. tid -> t a -> Maybe a
376 }
377
378-- | Methods for using 'Data.IntMap'.
379intMapMethods :: TableMethods IntMap Int
380intMapMethods = TableMethods IntMap.insert IntMap.delete IntMap.lookup
381
382-- | Methods for using 'Data.Word64Map'.
383w64MapMethods :: TableMethods Word64Map Word64
384w64MapMethods = TableMethods W64Map.insert W64Map.delete W64Map.lookup
385
386-- | Methods for using 'Data.Map'
387mapMethods :: Ord tid => TableMethods (Map tid) tid
388mapMethods = TableMethods Map.insert Map.delete Map.lookup
389
390-- | Change the key type for a lookup table implementation.
391--
392-- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to
393-- only a part of the generated /tid/ value. This is useful for /tid/ types
394-- that are especially large due their use for other purposes, such as secure
395-- nonces for encryption.
396instance Contravariant (TableMethods t) where
397 -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid
398 contramap f (TableMethods ins del lookup) =
399 TableMethods (\k v t -> ins (f k) v t)
400 (\k t -> del (f k) t)
401 (\k t -> lookup (f k) t)
402
403-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a 401-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
404-- function for generating unique transaction ids. 402-- function for generating unique transaction ids.
405transactionMethods :: 403transactionMethods ::
406 TableMethods t tid -- ^ Table methods to lookup values by /tid/. 404 TableMethods t tid -- ^ Table methods to lookup values by /tid/.
407 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 405 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
408 -> TransactionMethods (g,t (MVar x)) tid addr x 406 -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x
409transactionMethods methods generate = transactionMethods' id tryPutMVar methods generate 407transactionMethods methods generate = transactionMethods' id id methods generate
408
409microsecondsDiff :: Int -> POSIXTime
410microsecondsDiff us = fromIntegral us / 1000000
410 411
411-- | Like 'transactionMethods' but allows extra information to be stored in the 412-- | Like 'transactionMethods' but allows extra information to be stored in the
412-- table of pending transactions. This also enables multiple 'Client's to 413-- table of pending transactions. This also enables multiple 'Client's to
413-- share a single transaction table. 414-- share a single transaction table.
414transactionMethods' :: 415transactionMethods' ::
415 (MVar x -> a) -- ^ store MVar into table entry 416 ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry
416 -> (a -> x -> IO void) -- ^ load MVar from table entry 417 -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry
417 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. 418 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/.
418 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 419 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
419 -> TransactionMethods (g,t a) tid addr x 420 -> TransactionMethods (g,t a) tid addr x
420transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods 421transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
421 { dispatchCancel = \tid (g,t) -> return (g, delete tid t) 422 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
422 , dispatchRegister = \v _ (g,t) -> 423 , dispatchRegister = \getTimeout now v a0 (g,t) -> do
423 let (tid,g') = generate g 424 let (tid,g') = generate g
424 t' = insert tid (store v) t 425 (a,expiry) <- getTimeout tid a0
425 in return ( tid, (g',t') ) 426 let t' = insert tid (store v) (now + microsecondsDiff expiry) t
427 return ( (tid,a,expiry), (g',t') )
426 , dispatchResponse = \tid x (g,t) -> 428 , dispatchResponse = \tid x (g,t) ->
427 case lookup tid t of 429 case lookup tid t of
428 Just v -> let t' = delete tid t 430 Just v -> let t' = delete tid t
429 in return ((g,t'),void $ load v x) 431 in return ((g,t'),void $ load v $ Just x)
430 Nothing -> return ((g,t), return ()) 432 Nothing -> return ((g,t), return ())
431 } 433 }
432 434
@@ -459,8 +461,6 @@ data ErrorReporter addr x meth tid err = ErrorReporter
459 , reportMissingHandler :: meth -> addr -> x -> IO () 461 , reportMissingHandler :: meth -> addr -> x -> IO ()
460 -- | Incoming: unable to identify request. 462 -- | Incoming: unable to identify request.
461 , reportUnknown :: addr -> x -> err -> IO () 463 , reportUnknown :: addr -> x -> err -> IO ()
462 -- | Outgoing: remote peer is not responding.
463 , reportTimeout :: meth -> tid -> addr -> IO ()
464 } 464 }
465 465
466ignoreErrors :: ErrorReporter addr x meth tid err 466ignoreErrors :: ErrorReporter addr x meth tid err
@@ -468,7 +468,6 @@ ignoreErrors = ErrorReporter
468 { reportParseError = \_ -> return () 468 { reportParseError = \_ -> return ()
469 , reportMissingHandler = \_ _ _ -> return () 469 , reportMissingHandler = \_ _ _ -> return ()
470 , reportUnknown = \_ _ _ -> return () 470 , reportUnknown = \_ _ _ -> return ()
471 , reportTimeout = \_ _ _ -> return ()
472 } 471 }
473 472
474logErrors :: ( Show addr 473logErrors :: ( Show addr
@@ -478,7 +477,6 @@ logErrors = ErrorReporter
478 { reportParseError = \err -> dput XMisc err 477 { reportParseError = \err -> dput XMisc err
479 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" 478 , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")"
480 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err 479 , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err
481 , reportTimeout = \meth tid addr -> dput XMisc $ show addr ++ " --> Timeout ("++show meth++")"
482 } 480 }
483 481
484printErrors :: ( Show addr 482printErrors :: ( Show addr
@@ -488,17 +486,15 @@ printErrors h = ErrorReporter
488 { reportParseError = \err -> hPutStrLn h err 486 { reportParseError = \err -> hPutStrLn h err
489 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" 487 , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")"
490 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err 488 , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err
491 , reportTimeout = \meth tid addr -> hPutStrLn h $ show addr ++ " --> Timeout ("++show meth++")"
492 } 489 }
493 490
494-- Change the /err/ type for an 'ErrorReporter'. 491-- Change the /err/ type for an 'ErrorReporter'.
495instance Contravariant (ErrorReporter addr x meth tid) where 492instance Contravariant (ErrorReporter addr x meth tid) where
496 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 493 -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5
497 contramap f (ErrorReporter pe mh unk tim) 494 contramap f (ErrorReporter pe mh unk)
498 = ErrorReporter (\e -> pe (f e)) 495 = ErrorReporter (\e -> pe (f e))
499 mh 496 mh
500 (\addr x e -> unk addr x (f e)) 497 (\addr x e -> unk addr x (f e))
501 tim
502 498
503-- | Handle a single inbound packet and then invoke the given continuation. 499-- | Handle a single inbound packet and then invoke the given continuation.
504-- The 'forkListener' function is implemented by passing this function to 'fix' 500-- The 'forkListener' function is implemented by passing this function to 'fix'
@@ -509,7 +505,7 @@ handleMessage ::
509 -> addr 505 -> addr
510 -> x 506 -> x
511 -> IO (Maybe (x -> x)) 507 -> IO (Maybe (x -> x))
512handleMessage (Client net d err pending whoami responseID _ _) addr plain = do 508handleMessage (Client net d err pending whoami responseID) addr plain = do
513 -- Just (Left e) -> do reportParseError err e 509 -- Just (Left e) -> do reportParseError err e
514 -- return $! Just id 510 -- return $! Just id
515 -- Just (Right (plain, addr)) -> do 511 -- Just (Right (plain, addr)) -> do
@@ -637,19 +633,3 @@ testPairTransport = do
637 b = SockAddrInet 2 2 633 b = SockAddrInet 2 2
638 return ( chanTransport (const bchan) a achan aclosed 634 return ( chanTransport (const bchan) a achan aclosed
639 , chanTransport (const achan) b bchan bclosed ) 635 , chanTransport (const achan) b bchan bclosed )
640
641serializeClient :: Client err meth tid addr x -> IO (Client err meth tid addr x)
642serializeClient c = do
643 mvar <- newMVar ()
644 return $ c { clientEnterQuery = \tid -> takeMVar mvar
645 , clientLeaveQuery = \tid didRespond -> putMVar mvar ()
646 }
647
648retardSend :: Int -> Client err meth tid addr x -> IO (Client err meth tid addr x)
649retardSend micros client = do
650 mvar <- newMVar () :: IO (MVar ())
651 return client { clientEnterQuery = \tid -> do
652 takeMVar mvar
653 threadDelay micros
654 putMVar mvar ()
655 }