summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network/DatagramServer.hs136
1 files changed, 53 insertions, 83 deletions
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs
index eed2ced1..f212ffdf 100644
--- a/src/Network/DatagramServer.hs
+++ b/src/Network/DatagramServer.hs
@@ -76,13 +76,12 @@ module Network.DatagramServer
76 , handler 76 , handler
77 77
78 -- * Manager 78 -- * Manager
79 , MonadKRPC (..)
80 , Options (..) 79 , Options (..)
81 , def 80 , def
82 , Manager 81 , Manager
83 , newManager 82 , newManager
84 , closeManager 83 , closeManager
85 , withManager 84 -- , withManager
86 , isActive 85 , isActive
87 , listen 86 , listen
88 , Protocol(..) 87 , Protocol(..)
@@ -144,6 +143,7 @@ data Options = Options
144 143
145 -- | Maximum number of bytes to receive. 144 -- | Maximum number of bytes to receive.
146 , optMaxMsgSize :: {-# UNPACK #-} !Int 145 , optMaxMsgSize :: {-# UNPACK #-} !Int
146
147 } deriving (Show, Eq) 147 } deriving (Show, Eq)
148 148
149defaultSeedTransaction :: Int 149defaultSeedTransaction :: Int
@@ -197,29 +197,9 @@ data Manager h raw msg = Manager
197 , transactionCounter :: {-# UNPACK #-} !TransactionCounter 197 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
198 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) 198 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw)
199 , handlers :: [Handler h msg raw] 199 , handlers :: [Handler h msg raw]
200 , logMsg :: Char -> String -> T.Text -> IO ()
200 } 201 }
201 202
202-- | A monad which can perform or handle queries.
203class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
204 => MonadKRPC h m raw msg | m -> h, m -> raw, m -> msg where
205
206 -- | Ask for manager.
207 getManager :: m (Manager h raw msg)
208
209 default getManager :: MonadReader (Manager h raw msg) m => m (Manager h raw msg)
210 getManager = ask
211
212 -- | Can be used to add logging for instance.
213 liftHandler :: h a -> m a
214
215 default liftHandler :: m a -> m a
216 liftHandler = id
217
218instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
219 => MonadKRPC h (ReaderT (Manager h raw msg) h) raw msg where
220
221 liftHandler = lift
222
223sockAddrFamily :: SockAddr -> Family 203sockAddrFamily :: SockAddr -> Family
224sockAddrFamily (SockAddrInet _ _ ) = AF_INET 204sockAddrFamily (SockAddrInet _ _ ) = AF_INET
225sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 205sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
@@ -229,16 +209,17 @@ sockAddrFamily (SockAddrCan _ ) = AF_CAN
229-- | Bind socket to the specified address. To enable query handling 209-- | Bind socket to the specified address. To enable query handling
230-- run 'listen'. 210-- run 'listen'.
231newManager :: Options -- ^ various protocol options; 211newManager :: Options -- ^ various protocol options;
212 -> (Char -> String -> T.Text -> IO ()) -- ^ loging function
232 -> SockAddr -- ^ address to listen on; 213 -> SockAddr -- ^ address to listen on;
233 -> [Handler h msg raw] -- ^ handlers to run on incoming queries. 214 -> [Handler h msg raw] -- ^ handlers to run on incoming queries.
234 -> IO (Manager h raw msg) -- ^ new rpc manager. 215 -> IO (Manager h raw msg) -- ^ new rpc manager.
235newManager opts @ Options {..} servAddr handlers = do 216newManager opts @ Options {..} logmsg servAddr handlers = do
236 validateOptions opts 217 validateOptions opts
237 sock <- bindServ 218 sock <- bindServ
238 tref <- newEmptyMVar 219 tref <- newEmptyMVar
239 tran <- newIORef optSeedTransaction 220 tran <- newIORef optSeedTransaction
240 calls <- newIORef M.empty 221 calls <- newIORef M.empty
241 return $ Manager sock opts tref tran calls handlers 222 return $ Manager sock opts tref tran calls handlers logmsg
242 where 223 where
243 bindServ = do 224 bindServ = do
244 let family = sockAddrFamily servAddr 225 let family = sockAddrFamily servAddr
@@ -261,11 +242,13 @@ isActive :: Manager m raw msg -> IO Bool
261isActive Manager {..} = liftIO $ isBound sock 242isActive Manager {..} = liftIO $ isBound sock
262{-# INLINE isActive #-} 243{-# INLINE isActive #-}
263 244
245#if 0
264-- | Normally you should use Control.Monad.Trans.Resource.allocate 246-- | Normally you should use Control.Monad.Trans.Resource.allocate
265-- function. 247-- function.
266withManager :: Options -> SockAddr -> [Handler h msg raw] 248withManager :: Options -> SockAddr -> [Handler h msg raw]
267 -> (Manager h raw msg -> IO a) -> IO a 249 -> (Manager h raw msg -> IO a) -> IO a
268withManager opts addr hs = bracket (newManager opts addr hs) closeManager 250withManager opts addr hs = bracket (newManager opts addr hs) closeManager
251#endif
269 252
270{----------------------------------------------------------------------- 253{-----------------------------------------------------------------------
271-- Logging 254-- Logging
@@ -306,10 +289,9 @@ genTransactionId ref = do
306 uniqueTransactionId cur 289 uniqueTransactionId cur
307 290
308-- | How many times 'query' call have been performed. 291-- | How many times 'query' call have been performed.
309getQueryCount :: MonadKRPC h m raw msg => m Int 292getQueryCount :: Manager h raw msg -> IO Int
310getQueryCount = do 293getQueryCount mgr@Manager{..} = do
311 Manager {..} <- getManager 294 curTrans <- readIORef transactionCounter
312 curTrans <- liftIO $ readIORef transactionCounter
313 return $ curTrans - optSeedTransaction options 295 return $ curTrans - optSeedTransaction options
314 296
315registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw) 297registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw)
@@ -338,27 +320,26 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q
338-- This function should throw 'QueryFailure' exception if quered node 320-- This function should throw 'QueryFailure' exception if quered node
339-- respond with @error@ message or the query timeout expires. 321-- respond with @error@ message or the query timeout expires.
340-- 322--
341query :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m b 323query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO b
342query meth addr params = queryK meth addr params (\_ x _ -> x) 324query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x)
343 325
344-- | Like 'query' but possibly returns your externally routable IP address. 326-- | Like 'query' but possibly returns your externally routable IP address.
345query' :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m (b, Maybe ReflectedIP) 327query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP)
346query' meth addr params = queryK meth addr params (const (,)) 328query' mgr meth addr params = queryK mgr meth addr params (const (,))
347 329
348-- | Enqueue a query, but give us the complete BEncoded content sent by the 330-- | Enqueue a query, but give us the complete BEncoded content sent by the
349-- remote Node. This is useful for handling extensions that this library does 331-- remote Node. This is useful for handling extensions that this library does
350-- not otherwise support. 332-- not otherwise support.
351queryRaw :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m (b, raw) 333queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw)
352queryRaw meth addr params = queryK meth addr params (\raw x _ -> (x,raw)) 334queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw))
353 335
354queryK :: forall h m a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => 336queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) =>
355 QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> m x 337 Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x
356queryK meth addr params kont = do 338queryK mgr@Manager{..} meth addr params kont = do
357 Manager {..} <- getManager
358 tid <- liftIO $ genTransactionId transactionCounter 339 tid <- liftIO $ genTransactionId transactionCounter
359 -- let queryMethod = method :: Method a b 340 -- let queryMethod = method :: Method a b
360 let signature = querySignature meth tid addr 341 let signature = querySignature meth tid addr
361 $(logDebugS) "query.sending" signature 342 logMsg 'D' "query.sending" signature
362 343
363 mres <- liftIO $ do 344 mres <- liftIO $ do
364 ares <- registerQuery (tid, addr) pendingCalls 345 ares <- registerQuery (tid, addr) pendingCalls
@@ -384,12 +365,12 @@ queryK meth addr params kont = do
384 365
385 case mres of 366 case mres of
386 Just res -> do 367 Just res -> do
387 $(logDebugS) "query.responded" $ signature 368 logMsg 'D' "query.responded" $ signature
388 return res 369 return res
389 370
390 Nothing -> do 371 Nothing -> do
391 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls 372 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
392 $(logWarnS) "query.not_responding" $ signature <> " for " <> 373 logMsg 'W' "query.not_responding" $ signature <> " for " <>
393 T.pack (show (optQueryTimeout options)) <> " seconds" 374 T.pack (show (optQueryTimeout options)) <> " seconds"
394 throw $ TimeoutExpired 375 throw $ TimeoutExpired
395 376
@@ -440,31 +421,30 @@ handler name body = (name, wrapper)
440 Left e -> pure $ Left e 421 Left e -> pure $ Left e
441 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) 422 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
442 423
443runHandler :: ( MonadKRPC h m raw msg 424runHandler :: ( Envelope msg
444 , Envelope msg
445 , Show (QueryMethod msg) 425 , Show (QueryMethod msg)
446 , Serialize (TransactionID msg)) 426 , Serialize (TransactionID msg))
447 => QueryMethod msg -> HandlerBody h msg raw -> SockAddr -> msg raw -> m (KResult msg raw) 427 => Manager IO raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw)
448runHandler meth h addr m = Lifted.catches wrapper failbacks 428runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks
449 where 429 where
450 signature = querySignature meth (envelopeTransaction m) addr 430 signature = querySignature meth (envelopeTransaction m) addr
451 431
452 wrapper = do 432 wrapper = do
453 $(logDebugS) "handler.quered" signature 433 logMsg 'D' "handler.quered" signature
454 result <- liftHandler (h addr m) 434 result <- h addr m
455 435
456 case result of 436 case result of
457 Left msg -> do 437 Left msg -> do
458 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg 438 logMsg 'D' "handler.bad_query" $ signature <> " !" <> T.pack msg
459 return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m) 439 return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m)
460 440
461 Right a -> do -- KQueryArgs 441 Right a -> do -- KQueryArgs
462 $(logDebugS) "handler.success" signature 442 logMsg 'D' "handler.success" signature
463 return $ Right a 443 return $ Right a
464 444
465 failbacks = 445 failbacks =
466 [ E.Handler $ \ (e :: HandlerFailure) -> do 446 [ E.Handler $ \ (e :: HandlerFailure) -> do
467 $(logDebugS) "handler.failed" signature 447 logMsg 'D' "handler.failed" signature
468 return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m) 448 return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m)
469 449
470 450
@@ -478,17 +458,15 @@ runHandler meth h addr m = Lifted.catches wrapper failbacks
478 return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m) 458 return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m)
479 ] 459 ]
480 460
481dispatchHandler :: ( MonadKRPC h m raw msg 461dispatchHandler :: ( Eq (QueryMethod msg)
482 , Eq (QueryMethod msg)
483 , Show (QueryMethod msg) 462 , Show (QueryMethod msg)
484 , Serialize (TransactionID msg) 463 , Serialize (TransactionID msg)
485 , Envelope msg 464 , Envelope msg
486 ) => QueryMethod msg -> msg raw -> SockAddr -> m (KResult msg raw) 465 ) => Manager IO raw msg -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw)
487dispatchHandler meth q addr = do 466dispatchHandler mgr@Manager{..} meth q addr = do
488 Manager {..} <- getManager
489 case L.lookup meth handlers of 467 case L.lookup meth handlers of
490 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) 468 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q)
491 Just h -> runHandler meth h addr q 469 Just h -> runHandler mgr meth h addr q
492 470
493{----------------------------------------------------------------------- 471{-----------------------------------------------------------------------
494-- Listener 472-- Listener
@@ -501,16 +479,14 @@ dispatchHandler meth q addr = do
501-- peer B fork too many threads 479-- peer B fork too many threads
502-- ... space leak 480-- ... space leak
503-- 481--
504handleQuery :: ( MonadKRPC h m raw msg 482handleQuery :: ( WireFormat raw msg
505 , WireFormat raw msg
506 , Eq (QueryMethod msg) 483 , Eq (QueryMethod msg)
507 , Show (QueryMethod msg) 484 , Show (QueryMethod msg)
508 , Serialize (TransactionID msg) 485 , Serialize (TransactionID msg)
509 ) => QueryMethod msg -> raw -> msg raw -> SockAddr -> m () 486 ) => Manager IO raw msg -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO ()
510handleQuery meth raw q addr = void $ fork $ do 487handleQuery mgr@Manager{..} meth raw q addr = void $ fork $ do
511 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" 488 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
512 Manager {..} <- getManager 489 res <- dispatchHandler mgr meth q addr
513 res <- dispatchHandler meth q addr
514 let res' = either buildError Just res 490 let res' = either buildError Just res
515 ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline" 491 ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline"
516 resbs = fmap (encodeHeaders ctx) res' :: Maybe BS.ByteString 492 resbs = fmap (encodeHeaders ctx) res' :: Maybe BS.ByteString
@@ -523,12 +499,10 @@ handleQuery meth raw q addr = void $ fork $ do
523 -- ] 499 -- ]
524 maybe (return ()) (sendMessage sock addr) resbs 500 maybe (return ()) (sendMessage sock addr) resbs
525 501
526handleResponse :: ( MonadKRPC h m raw msg 502handleResponse :: ( Ord (TransactionID msg)
527 , Ord (TransactionID msg)
528 , Envelope msg 503 , Envelope msg
529 ) => raw -> KResult msg raw -> SockAddr -> m () 504 ) => Manager IO raw msg -> raw -> KResult msg raw -> SockAddr -> IO ()
530handleResponse raw result addr = do 505handleResponse mgr@Manager{..} raw result addr = do
531 Manager {..} <- getManager
532 liftIO $ do 506 liftIO $ do
533 let resultId = either errorId envelopeTransaction result 507 let resultId = either errorId envelopeTransaction result
534 mcall <- unregisterQuery (resultId, addr) pendingCalls 508 mcall <- unregisterQuery (resultId, addr) pendingCalls
@@ -540,16 +514,14 @@ data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw)
540 , msgProxy :: !(Proxy msg) 514 , msgProxy :: !(Proxy msg)
541 } 515 }
542 516
543listener :: forall h m raw msg. 517listener :: forall raw msg.
544 ( MonadKRPC h m raw msg 518 ( WireFormat raw msg
545 , WireFormat raw msg
546 , Ord (TransactionID msg) 519 , Ord (TransactionID msg)
547 , Eq (QueryMethod msg) 520 , Eq (QueryMethod msg)
548 , Show (QueryMethod msg) 521 , Show (QueryMethod msg)
549 , Serialize (TransactionID msg) 522 , Serialize (TransactionID msg)
550 ) => Protocol raw msg -> m () 523 ) => Manager IO raw msg -> Protocol raw msg -> IO ()
551listener p = do 524listener mgr@Manager{..} p = do
552 Manager {..} <- getManager
553 fix $ \again -> do 525 fix $ \again -> do
554 let ctx = error "TODO TOX ToxCipherContext or () for Mainline" 526 let ctx = error "TODO TOX ToxCipherContext or () for Mainline"
555 (bs, addr) <- liftIO $ do 527 (bs, addr) <- liftIO $ do
@@ -560,9 +532,9 @@ listener p = do
560 return () -- Without transaction id, error message isn't very useful. 532 return () -- Without transaction id, error message isn't very useful.
561 Right (raw,m) -> 533 Right (raw,m) ->
562 case envelopeClass m of 534 case envelopeClass m of
563 Query meth -> handleQuery meth (raw::raw) m addr 535 Query meth -> handleQuery mgr meth (raw::raw) m addr
564 Response _ -> handleResponse raw (Right m) addr 536 Response _ -> handleResponse mgr raw (Right m) addr
565 Error e -> handleResponse raw (Left e) addr 537 Error e -> handleResponse mgr raw (Left e) addr
566 538
567 again 539 again
568 where 540 where
@@ -574,17 +546,15 @@ listener p = do
574 546
575-- | Should be run before any 'query', otherwise they will never 547-- | Should be run before any 'query', otherwise they will never
576-- succeed. 548-- succeed.
577listen :: ( MonadKRPC h m raw msg 549listen :: ( WireFormat raw msg
578 , WireFormat raw msg
579 , Ord (TransactionID msg) 550 , Ord (TransactionID msg)
580 , Eq (QueryMethod msg) 551 , Eq (QueryMethod msg)
581 , Show (QueryMethod msg) 552 , Show (QueryMethod msg)
582 , Serialize (TransactionID msg) 553 , Serialize (TransactionID msg)
583 ) => Protocol raw msg -> m () 554 ) => Manager IO raw msg -> Protocol raw msg -> IO ()
584listen p = do 555listen mgr@Manager{..} p = do
585 Manager {..} <- getManager
586 tid <- fork $ do 556 tid <- fork $ do
587 myThreadId >>= liftIO . flip labelThread "KRPC.listen" 557 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
588 listener p `Lifted.finally` 558 listener mgr p `Lifted.finally`
589 liftIO (takeMVar listenerThread) 559 liftIO (takeMVar listenerThread)
590 liftIO $ putMVar listenerThread tid 560 liftIO $ putMVar listenerThread tid