summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-08 13:03:00 -0400
committerjoe <joe@jerkface.net>2017-06-08 13:03:00 -0400
commit33db41f8f2c80172aaf6914bd2900b564bb8b82a (patch)
tree34b3808ef0d8e702dbdacf4e9ad22d1520f487d1 /src
parent8c33deac14ca92ef67afc7fbcd3f67bc19317f88 (diff)
Merged Network.KRPC.Manager into Network.KRPC.
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs2
-rw-r--r--src/Network/KRPC.hs565
-rw-r--r--src/Network/KRPC/Manager.hs596
3 files changed, 565 insertions, 598 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 56ea262a..a872fc72 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -83,7 +83,7 @@ import Data.Time.Clock.POSIX
83 83
84import Network.KRPC as KRPC hiding (Options, def) 84import Network.KRPC as KRPC hiding (Options, def)
85import Network.KRPC.Message (ReflectedIP(..)) 85import Network.KRPC.Message (ReflectedIP(..))
86import Network.KRPC.Manager (QueryFailure(..)) 86import Network.KRPC (QueryFailure(..))
87import Data.Torrent 87import Data.Torrent
88import Network.BitTorrent.DHT.Message 88import Network.BitTorrent.DHT.Message
89import Network.BitTorrent.DHT.Routing as R 89import Network.BitTorrent.DHT.Routing as R
diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs
index d185fb4c..22ab81f2 100644
--- a/src/Network/KRPC.hs
+++ b/src/Network/KRPC.hs
@@ -50,6 +50,16 @@
50-- 50--
51-- For protocol details see "Network.KRPC.Message" module. 51-- For protocol details see "Network.KRPC.Message" module.
52-- 52--
53{-# LANGUAGE CPP #-}
54{-# LANGUAGE OverloadedStrings #-}
55{-# LANGUAGE FlexibleInstances #-}
56{-# LANGUAGE FlexibleContexts #-}
57{-# LANGUAGE ScopedTypeVariables #-}
58{-# LANGUAGE DefaultSignatures #-}
59{-# LANGUAGE MultiParamTypeClasses #-}
60{-# LANGUAGE FunctionalDependencies #-}
61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE TemplateHaskell #-}
53module Network.KRPC 63module Network.KRPC
54 ( -- * Methods 64 ( -- * Methods
55 Method 65 Method
@@ -87,5 +97,558 @@ module Network.KRPC
87import Data.Default.Class 97import Data.Default.Class
88import Network.KRPC.Message 98import Network.KRPC.Message
89import Network.KRPC.Method 99import Network.KRPC.Method
90import Network.KRPC.Manager
91import Network.Socket (SockAddr (..)) 100import Network.Socket (SockAddr (..))
101
102import Control.Applicative
103#ifdef THREAD_DEBUG
104import Control.Concurrent.Lifted.Instrument
105#else
106import GHC.Conc (labelThread)
107import Control.Concurrent.Lifted
108#endif
109import Control.Exception hiding (Handler)
110import qualified Control.Exception.Lifted as E (Handler (..))
111import Control.Exception.Lifted as Lifted (catches, finally)
112import Control.Monad
113import Control.Monad.Logger
114import Control.Monad.Reader
115import Control.Monad.Trans.Control
116#ifdef VERSION_bencoding
117import Data.BEncode as BE
118import Data.BEncode.Internal as BE
119import Data.BEncode.Pretty (showBEncode)
120#else
121import qualified Data.Tox as Tox
122#endif
123import qualified Data.ByteString.Base16 as Base16
124import Data.ByteString as BS
125import Data.ByteString.Char8 as BC
126import Data.ByteString.Lazy as BL
127import Data.Default.Class
128import Data.IORef
129import Data.List as L
130import Data.Map as M
131import Data.Monoid
132import Data.Serialize as S
133import Data.Text as T
134import Data.Text.Encoding as T
135import Data.Tuple
136import Data.Typeable
137import Network.RPC
138import Network.KRPC.Message
139import Network.KRPC.Method hiding (Envelope)
140import qualified Network.KRPC.Method as KRPC (Envelope)
141import Network.Socket hiding (listen)
142import Network.Socket.ByteString as BS
143import System.IO.Error
144import System.Timeout
145#ifdef VERSION_bencoding
146import Network.DHT.Mainline
147#endif
148
149
150{-----------------------------------------------------------------------
151-- Options
152-----------------------------------------------------------------------}
153
154-- | RPC manager options.
155data Options = Options
156 { -- | Initial 'TransactionId' incremented with each 'query';
157 optSeedTransaction :: {-# UNPACK #-} !Int
158
159 -- | Time to wait for response from remote node, in seconds.
160 , optQueryTimeout :: {-# UNPACK #-} !Int
161
162 -- | Maximum number of bytes to receive.
163 , optMaxMsgSize :: {-# UNPACK #-} !Int
164 } deriving (Show, Eq)
165
166defaultSeedTransaction :: Int
167defaultSeedTransaction = 0
168
169defaultQueryTimeout :: Int
170defaultQueryTimeout = 120
171
172defaultMaxMsgSize :: Int
173defaultMaxMsgSize = 64 * 1024
174
175-- | Permissive defaults.
176instance Default Options where
177 def = Options
178 { optSeedTransaction = defaultSeedTransaction
179 , optQueryTimeout = defaultQueryTimeout
180 , optMaxMsgSize = defaultMaxMsgSize
181 }
182
183validateOptions :: Options -> IO ()
184validateOptions Options {..}
185 | optQueryTimeout < 1
186 = throwIO (userError "krpc: non-positive query timeout")
187 | optMaxMsgSize < 1
188 = throwIO (userError "krpc: non-positive buffer size")
189 | otherwise = return ()
190
191{-----------------------------------------------------------------------
192-- Options
193-----------------------------------------------------------------------}
194
195type KResult = Either KError KMessage -- Response
196
197type TransactionCounter = IORef Int
198type CallId = (TransactionId, SockAddr)
199type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response)
200type PendingCalls = IORef (Map CallId CallRes)
201
202type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
203
204-- | Handler is a function which will be invoked then some /remote/
205-- node querying /this/ node.
206type Handler h msg v = (MethodName, HandlerBody h msg v)
207
208-- | Keep track pending queries made by /this/ node and handle queries
209-- made by /remote/ nodes.
210data Manager h = Manager
211 { sock :: !Socket
212 , options :: !Options
213 , listenerThread :: !(MVar ThreadId)
214 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
215 , pendingCalls :: {-# UNPACK #-} !PendingCalls
216#ifdef VERSION_bencoding
217 , handlers :: [Handler h KMessageOf BValue]
218#else
219 , handlers :: [Handler h KMessageOf BC.ByteString]
220#endif
221 }
222
223-- | A monad which can perform or handle queries.
224class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
225 => MonadKRPC h m | m -> h where
226
227 -- | Ask for manager.
228 getManager :: m (Manager h)
229
230 default getManager :: MonadReader (Manager h) m => m (Manager h)
231 getManager = ask
232
233 -- | Can be used to add logging for instance.
234 liftHandler :: h a -> m a
235
236 default liftHandler :: m a -> m a
237 liftHandler = id
238
239instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
240 => MonadKRPC h (ReaderT (Manager h) h) where
241
242 liftHandler = lift
243
244sockAddrFamily :: SockAddr -> Family
245sockAddrFamily (SockAddrInet _ _ ) = AF_INET
246sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
247sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
248sockAddrFamily (SockAddrCan _ ) = AF_CAN
249
250-- | Bind socket to the specified address. To enable query handling
251-- run 'listen'.
252newManager :: Options -- ^ various protocol options;
253 -> SockAddr -- ^ address to listen on;
254#ifdef VERSION_bencoding
255 -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries.
256#else
257 -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries.
258#endif
259 -> IO (Manager h) -- ^ new rpc manager.
260newManager opts @ Options {..} servAddr handlers = do
261 validateOptions opts
262 sock <- bindServ
263 tref <- newEmptyMVar
264 tran <- newIORef optSeedTransaction
265 calls <- newIORef M.empty
266 return $ Manager sock opts tref tran calls handlers
267 where
268 bindServ = do
269 let family = sockAddrFamily servAddr
270 sock <- socket family Datagram defaultProtocol
271 when (family == AF_INET6) $ do
272 setSocketOption sock IPv6Only 0
273 bindSocket sock servAddr
274 return sock
275
276-- | Unblock all pending calls and close socket.
277closeManager :: Manager m -> IO ()
278closeManager Manager {..} = do
279 maybe (return ()) killThread =<< tryTakeMVar listenerThread
280 -- TODO unblock calls
281 close sock
282
283-- | Check if the manager is still active. Manager becomes active
284-- until 'closeManager' called.
285isActive :: Manager m -> IO Bool
286isActive Manager {..} = liftIO $ isBound sock
287{-# INLINE isActive #-}
288
289-- | Normally you should use Control.Monad.Trans.Resource.allocate
290-- function.
291#ifdef VERSION_bencoding
292withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue]
293#else
294withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString]
295#endif
296 -> (Manager h -> IO a) -> IO a
297withManager opts addr hs = bracket (newManager opts addr hs) closeManager
298
299{-----------------------------------------------------------------------
300-- Logging
301-----------------------------------------------------------------------}
302
303-- TODO prettify log messages
304querySignature :: MethodName -> TransactionId -> SockAddr -> Text
305querySignature name transaction addr = T.concat
306#ifdef VERSION_bencoding
307 [ "&", T.decodeUtf8 name
308 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
309#else
310 [ "&", T.pack (show name)
311 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
312#endif
313 , " @", T.pack (show addr)
314 ]
315
316{-----------------------------------------------------------------------
317-- Client
318-----------------------------------------------------------------------}
319-- we don't need to know about TransactionId while performing query,
320-- so we introduce QueryFailure exceptions
321
322-- | Used to signal 'query' errors.
323data QueryFailure
324 = SendFailed -- ^ unable to send query;
325 | QueryFailed ErrorCode Text -- ^ remote node return error;
326 | TimeoutExpired -- ^ remote node not responding.
327 deriving (Show, Eq, Typeable)
328
329instance Exception QueryFailure
330
331sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
332sendMessage sock addr a = do
333 liftIO $ sendManyTo sock [a] addr
334
335genTransactionId :: TransactionCounter -> IO TransactionId
336genTransactionId ref = do
337 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
338#ifdef VERSION_bencoding
339 return $ BC.pack (show cur)
340#else
341 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
342#endif
343
344-- | How many times 'query' call have been performed.
345getQueryCount :: MonadKRPC h m => m Int
346getQueryCount = do
347 Manager {..} <- getManager
348 curTrans <- liftIO $ readIORef transactionCounter
349 return $ curTrans - optSeedTransaction options
350
351registerQuery :: CallId -> PendingCalls -> IO CallRes
352registerQuery cid ref = do
353 ares <- newEmptyMVar
354 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
355 return ares
356
357-- simultaneous M.lookup and M.delete guarantees that we never get two
358-- or more responses to the same query
359unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
360unregisterQuery cid ref = do
361 atomicModifyIORef' ref $ swap .
362 M.updateLookupWithKey (const (const Nothing)) cid
363
364
365-- (sendmsg EINVAL)
366sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO ()
367sendQuery sock addr q = handle sockError $ sendMessage sock addr q
368 where
369 sockError :: IOError -> IO ()
370 sockError _ = throwIO SendFailed
371
372-- | Enqueue query to the given node.
373--
374-- This function should throw 'QueryFailure' exception if quered node
375-- respond with @error@ message or the query timeout expires.
376--
377query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
378query addr params = queryK addr params (\_ x _ -> x)
379
380-- | Like 'query' but possibly returns your externally routable IP address.
381query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP)
382query' addr params = queryK addr params (const (,))
383
384-- | Enqueue a query, but give us the complete BEncoded content sent by the
385-- remote Node. This is useful for handling extensions that this library does
386-- not otherwise support.
387queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs)
388queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
389
390queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
391 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x
392queryK addr params kont = do
393 Manager {..} <- getManager
394 tid <- liftIO $ genTransactionId transactionCounter
395 let queryMethod = method :: Method a b
396 let signature = querySignature (methodName queryMethod) tid addr
397 $(logDebugS) "query.sending" signature
398
399 mres <- liftIO $ do
400 ares <- registerQuery (tid, addr) pendingCalls
401
402#ifdef VERSION_bencoding
403 let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid)
404 qb = encodePayload q :: KMessage
405 qbs = encodeHeaders () qb :: BC.ByteString
406#else
407 let q = Tox.Message (methodName queryMethod) cli tid params
408 cli = error "TODO TOX client node id"
409 ctx = error "TODO TOX ToxCipherContext"
410 qb = encodePayload q :: Tox.Message BC.ByteString
411 qbs = encodeHeaders ctx qb :: BC.ByteString
412#endif
413 sendQuery sock addr qbs
414 `onException` unregisterQuery (tid, addr) pendingCalls
415
416 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
417 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
418 case res of
419#ifdef VERSION_bencoding
420 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
421 Right (R (KResponse {..})) ->
422 case fromBEncode respVals of
423 Right r -> pure $ kont raw r respIP
424#else
425 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR"
426 Right (Tox.Message {..}) ->
427 case S.decode msgPayload of
428 Right r -> pure $ kont raw r Nothing
429#endif
430 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
431
432 case mres of
433 Just res -> do
434 $(logDebugS) "query.responded" $ signature
435 return res
436
437 Nothing -> do
438 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
439 $(logWarnS) "query.not_responding" $ signature <> " for " <>
440 T.pack (show (optQueryTimeout options)) <> " seconds"
441 throw $ TimeoutExpired
442
443{-----------------------------------------------------------------------
444-- Handlers
445-----------------------------------------------------------------------}
446-- we already throw:
447--
448-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
449--
450-- * ErrorCode(ServerError) in the 'runHandler';
451--
452-- * ErrorCode(GenericError) in the 'runHandler' (those can be
453-- async exception too)
454--
455-- so HandlerFailure should cover *only* 'ProtocolError's.
456
457-- | Used to signal protocol errors.
458data HandlerFailure
459 = BadAddress -- ^ for e.g.: node calls herself;
460 | InvalidParameter Text -- ^ for e.g.: bad session token.
461 deriving (Show, Eq, Typeable)
462
463instance Exception HandlerFailure
464
465prettyHF :: HandlerFailure -> BS.ByteString
466prettyHF BadAddress = T.encodeUtf8 "bad address"
467prettyHF (InvalidParameter reason) = T.encodeUtf8 $
468 "invalid parameter: " <> reason
469
470prettyQF :: QueryFailure -> BS.ByteString
471prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
472 <> T.pack (show e)
473
474-- | Make handler from handler function. Any thrown exception will be
475-- supressed and send over the wire back to the querying node.
476--
477-- If the handler make some 'query' normally it /should/ handle
478-- corresponding 'QueryFailure's.
479--
480handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
481 => (SockAddr -> a -> h b) -> Handler h msg raw
482handler body = (name, wrapper)
483 where
484 Method name = method :: Method a b
485 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
486 wrapper addr args =
487 case decodePayload args of
488 Left e -> pure $ Left e
489 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
490
491runHandler :: MonadKRPC h m
492#ifdef VERSION_bencoding
493 => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult
494#else
495 => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult
496#endif
497runHandler h addr m = Lifted.catches wrapper failbacks
498 where
499 signature = querySignature (queryMethod m) (queryId m) addr
500
501 wrapper = do
502 $(logDebugS) "handler.quered" signature
503#ifdef VERSION_bencoding
504 result <- liftHandler (h addr (Q m))
505#else
506 result <- liftHandler (h addr m)
507#endif
508
509 case result of
510 Left msg -> do
511 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
512#ifdef VERSION_bencoding
513 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
514#else
515 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
516#endif
517
518 Right a -> do -- KQueryArgs
519 $(logDebugS) "handler.success" signature
520 return $ Right a
521
522 failbacks =
523 [ E.Handler $ \ (e :: HandlerFailure) -> do
524 $(logDebugS) "handler.failed" signature
525#ifdef VERSION_bencoding
526 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
527#else
528 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
529#endif
530
531
532 -- may happen if handler makes query and fail
533 , E.Handler $ \ (e :: QueryFailure) -> do
534#ifdef VERSION_bencoding
535 return $ Left $ KError ServerError (prettyQF e) (queryId m)
536#else
537 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
538#endif
539
540 -- since handler thread exit after sendMessage we can safely
541 -- suppress async exception here
542 , E.Handler $ \ (e :: SomeException) -> do
543#ifdef VERSION_bencoding
544 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
545#else
546 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
547#endif
548 ]
549
550dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
551dispatchHandler q addr = do
552 Manager {..} <- getManager
553 case L.lookup (queryMethod q) handlers of
554#ifdef VERSION_bencoding
555 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q)
556#else
557 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
558#endif
559 Just h -> runHandler h addr q
560
561{-----------------------------------------------------------------------
562-- Listener
563-----------------------------------------------------------------------}
564
565-- TODO bound amount of parallel handler *threads*:
566--
567-- peer A flooding with find_node
568-- peer B trying to ping peer C
569-- peer B fork too many threads
570-- ... space leak
571--
572handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m ()
573handleQuery raw q addr = void $ fork $ do
574 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
575 Manager {..} <- getManager
576 res <- dispatchHandler q addr
577#ifdef VERSION_bencoding
578 let res' = either E id res
579 resbe = either toBEncode toBEncode res
580 $(logOther "q") $ T.unlines
581 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
582 , "==>"
583 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
584 ]
585 sendMessage sock addr $ encodeHeaders () res'
586#else
587 -- Errors not sent for Tox.
588 let ctx = error "TODO TOX ToxCipherContext 2"
589 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res
590#endif
591
592handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m ()
593handleResponse raw result addr = do
594 Manager {..} <- getManager
595 liftIO $ do
596#ifdef VERSION_bencoding
597 let resultId = either errorId envelopeTransaction result
598#else
599 let resultId = either Tox.msgNonce Tox.msgNonce result
600#endif
601 mcall <- unregisterQuery (resultId, addr) pendingCalls
602 case mcall of
603 Nothing -> return ()
604 Just ares -> putMVar ares (raw,result)
605
606#ifdef VERSION_bencoding
607handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m ()
608handleMessage raw (Q q) = handleQuery raw q
609handleMessage raw (R r) = handleResponse raw (Right (R r))
610handleMessage raw (E e) = handleResponse raw (Left e)
611#else
612handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m ()
613handleMessage raw q | Tox.isQuery q = handleQuery raw q
614handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r)
615handleMessage raw e | Tox.isError e = handleResponse raw (Left e)
616#endif
617
618listener :: MonadKRPC h m => m ()
619listener = do
620 Manager {..} <- getManager
621 fix $ \again -> do
622 let ctx = error "TODO TOX ToxCipherContext 3"
623 (bs, addr) <- liftIO $ do
624 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
625#ifdef VERSION_bencoding
626 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
627#else
628 case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of
629#endif
630 -- TODO ignore unknown messages at all?
631#ifdef VERSION_bencoding
632 Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage)
633#else
634 Left _ -> return () -- TODO TOX send unknownMessage error
635#endif
636 Right (raw,m) -> handleMessage raw m addr
637 again
638 where
639 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
640 exceptions e
641 -- packets with empty payload may trigger eof exception
642 | isEOFError e = return ("", SockAddrInet 0 0)
643 | otherwise = throwIO e
644
645-- | Should be run before any 'query', otherwise they will never
646-- succeed.
647listen :: MonadKRPC h m => m ()
648listen = do
649 Manager {..} <- getManager
650 tid <- fork $ do
651 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
652 listener `Lifted.finally`
653 liftIO (takeMVar listenerThread)
654 liftIO $ putMVar listenerThread tid
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
deleted file mode 100644
index efd59f32..00000000
--- a/src/Network/KRPC/Manager.hs
+++ /dev/null
@@ -1,596 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Normally, you don't need to import this module.
9--
10{-# LANGUAGE CPP #-}
11{-# LANGUAGE OverloadedStrings #-}
12{-# LANGUAGE FlexibleInstances #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE DefaultSignatures #-}
16{-# LANGUAGE MultiParamTypeClasses #-}
17{-# LANGUAGE FunctionalDependencies #-}
18{-# LANGUAGE DeriveDataTypeable #-}
19{-# LANGUAGE TemplateHaskell #-}
20module Network.KRPC.Manager
21 ( -- * Manager
22 MonadKRPC (..)
23 , Options (..)
24 , Manager
25 , newManager
26 , closeManager
27 , withManager
28 , isActive
29 , listen
30
31 -- * Queries
32 , QueryFailure (..)
33 , query
34 , query'
35 , queryRaw
36 , getQueryCount
37
38 -- * Handlers
39 , HandlerFailure (..)
40 , Handler
41 , handler
42 ) where
43
44import Control.Applicative
45#ifdef THREAD_DEBUG
46import Control.Concurrent.Lifted.Instrument
47#else
48import GHC.Conc (labelThread)
49import Control.Concurrent.Lifted
50#endif
51import Control.Exception hiding (Handler)
52import qualified Control.Exception.Lifted as E (Handler (..))
53import Control.Exception.Lifted as Lifted (catches, finally)
54import Control.Monad
55import Control.Monad.Logger
56import Control.Monad.Reader
57import Control.Monad.Trans.Control
58#ifdef VERSION_bencoding
59import Data.BEncode as BE
60import Data.BEncode.Internal as BE
61import Data.BEncode.Pretty (showBEncode)
62#else
63import qualified Data.Tox as Tox
64#endif
65import qualified Data.ByteString.Base16 as Base16
66import Data.ByteString as BS
67import Data.ByteString.Char8 as BC
68import Data.ByteString.Lazy as BL
69import Data.Default.Class
70import Data.IORef
71import Data.List as L
72import Data.Map as M
73import Data.Monoid
74import Data.Serialize as S
75import Data.Text as T
76import Data.Text.Encoding as T
77import Data.Tuple
78import Data.Typeable
79import Network.RPC
80import Network.KRPC.Message
81import Network.KRPC.Method hiding (Envelope)
82import qualified Network.KRPC.Method as KRPC (Envelope)
83import Network.Socket hiding (listen)
84import Network.Socket.ByteString as BS
85import System.IO.Error
86import System.Timeout
87#ifdef VERSION_bencoding
88import Network.DHT.Mainline
89#endif
90
91
92{-----------------------------------------------------------------------
93-- Options
94-----------------------------------------------------------------------}
95
96-- | RPC manager options.
97data Options = Options
98 { -- | Initial 'TransactionId' incremented with each 'query';
99 optSeedTransaction :: {-# UNPACK #-} !Int
100
101 -- | Time to wait for response from remote node, in seconds.
102 , optQueryTimeout :: {-# UNPACK #-} !Int
103
104 -- | Maximum number of bytes to receive.
105 , optMaxMsgSize :: {-# UNPACK #-} !Int
106 } deriving (Show, Eq)
107
108defaultSeedTransaction :: Int
109defaultSeedTransaction = 0
110
111defaultQueryTimeout :: Int
112defaultQueryTimeout = 120
113
114defaultMaxMsgSize :: Int
115defaultMaxMsgSize = 64 * 1024
116
117-- | Permissive defaults.
118instance Default Options where
119 def = Options
120 { optSeedTransaction = defaultSeedTransaction
121 , optQueryTimeout = defaultQueryTimeout
122 , optMaxMsgSize = defaultMaxMsgSize
123 }
124
125validateOptions :: Options -> IO ()
126validateOptions Options {..}
127 | optQueryTimeout < 1
128 = throwIO (userError "krpc: non-positive query timeout")
129 | optMaxMsgSize < 1
130 = throwIO (userError "krpc: non-positive buffer size")
131 | otherwise = return ()
132
133{-----------------------------------------------------------------------
134-- Options
135-----------------------------------------------------------------------}
136
137type KResult = Either KError KMessage -- Response
138
139type TransactionCounter = IORef Int
140type CallId = (TransactionId, SockAddr)
141type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response)
142type PendingCalls = IORef (Map CallId CallRes)
143
144type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
145
146-- | Handler is a function which will be invoked then some /remote/
147-- node querying /this/ node.
148type Handler h msg v = (MethodName, HandlerBody h msg v)
149
150-- | Keep track pending queries made by /this/ node and handle queries
151-- made by /remote/ nodes.
152data Manager h = Manager
153 { sock :: !Socket
154 , options :: !Options
155 , listenerThread :: !(MVar ThreadId)
156 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
157 , pendingCalls :: {-# UNPACK #-} !PendingCalls
158#ifdef VERSION_bencoding
159 , handlers :: [Handler h KMessageOf BValue]
160#else
161 , handlers :: [Handler h KMessageOf BC.ByteString]
162#endif
163 }
164
165-- | A monad which can perform or handle queries.
166class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
167 => MonadKRPC h m | m -> h where
168
169 -- | Ask for manager.
170 getManager :: m (Manager h)
171
172 default getManager :: MonadReader (Manager h) m => m (Manager h)
173 getManager = ask
174
175 -- | Can be used to add logging for instance.
176 liftHandler :: h a -> m a
177
178 default liftHandler :: m a -> m a
179 liftHandler = id
180
181instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
182 => MonadKRPC h (ReaderT (Manager h) h) where
183
184 liftHandler = lift
185
186sockAddrFamily :: SockAddr -> Family
187sockAddrFamily (SockAddrInet _ _ ) = AF_INET
188sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
189sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
190sockAddrFamily (SockAddrCan _ ) = AF_CAN
191
192-- | Bind socket to the specified address. To enable query handling
193-- run 'listen'.
194newManager :: Options -- ^ various protocol options;
195 -> SockAddr -- ^ address to listen on;
196#ifdef VERSION_bencoding
197 -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries.
198#else
199 -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries.
200#endif
201 -> IO (Manager h) -- ^ new rpc manager.
202newManager opts @ Options {..} servAddr handlers = do
203 validateOptions opts
204 sock <- bindServ
205 tref <- newEmptyMVar
206 tran <- newIORef optSeedTransaction
207 calls <- newIORef M.empty
208 return $ Manager sock opts tref tran calls handlers
209 where
210 bindServ = do
211 let family = sockAddrFamily servAddr
212 sock <- socket family Datagram defaultProtocol
213 when (family == AF_INET6) $ do
214 setSocketOption sock IPv6Only 0
215 bindSocket sock servAddr
216 return sock
217
218-- | Unblock all pending calls and close socket.
219closeManager :: Manager m -> IO ()
220closeManager Manager {..} = do
221 maybe (return ()) killThread =<< tryTakeMVar listenerThread
222 -- TODO unblock calls
223 close sock
224
225-- | Check if the manager is still active. Manager becomes active
226-- until 'closeManager' called.
227isActive :: Manager m -> IO Bool
228isActive Manager {..} = liftIO $ isBound sock
229{-# INLINE isActive #-}
230
231-- | Normally you should use Control.Monad.Trans.Resource.allocate
232-- function.
233#ifdef VERSION_bencoding
234withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue]
235#else
236withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString]
237#endif
238 -> (Manager h -> IO a) -> IO a
239withManager opts addr hs = bracket (newManager opts addr hs) closeManager
240
241{-----------------------------------------------------------------------
242-- Logging
243-----------------------------------------------------------------------}
244
245-- TODO prettify log messages
246querySignature :: MethodName -> TransactionId -> SockAddr -> Text
247querySignature name transaction addr = T.concat
248#ifdef VERSION_bencoding
249 [ "&", T.decodeUtf8 name
250 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
251#else
252 [ "&", T.pack (show name)
253 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
254#endif
255 , " @", T.pack (show addr)
256 ]
257
258{-----------------------------------------------------------------------
259-- Client
260-----------------------------------------------------------------------}
261-- we don't need to know about TransactionId while performing query,
262-- so we introduce QueryFailure exceptions
263
264-- | Used to signal 'query' errors.
265data QueryFailure
266 = SendFailed -- ^ unable to send query;
267 | QueryFailed ErrorCode Text -- ^ remote node return error;
268 | TimeoutExpired -- ^ remote node not responding.
269 deriving (Show, Eq, Typeable)
270
271instance Exception QueryFailure
272
273sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
274sendMessage sock addr a = do
275 liftIO $ sendManyTo sock [a] addr
276
277genTransactionId :: TransactionCounter -> IO TransactionId
278genTransactionId ref = do
279 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
280#ifdef VERSION_bencoding
281 return $ BC.pack (show cur)
282#else
283 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
284#endif
285
286-- | How many times 'query' call have been performed.
287getQueryCount :: MonadKRPC h m => m Int
288getQueryCount = do
289 Manager {..} <- getManager
290 curTrans <- liftIO $ readIORef transactionCounter
291 return $ curTrans - optSeedTransaction options
292
293registerQuery :: CallId -> PendingCalls -> IO CallRes
294registerQuery cid ref = do
295 ares <- newEmptyMVar
296 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
297 return ares
298
299-- simultaneous M.lookup and M.delete guarantees that we never get two
300-- or more responses to the same query
301unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
302unregisterQuery cid ref = do
303 atomicModifyIORef' ref $ swap .
304 M.updateLookupWithKey (const (const Nothing)) cid
305
306
307-- (sendmsg EINVAL)
308sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO ()
309sendQuery sock addr q = handle sockError $ sendMessage sock addr q
310 where
311 sockError :: IOError -> IO ()
312 sockError _ = throwIO SendFailed
313
314-- | Enqueue query to the given node.
315--
316-- This function should throw 'QueryFailure' exception if quered node
317-- respond with @error@ message or the query timeout expires.
318--
319query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
320query addr params = queryK addr params (\_ x _ -> x)
321
322-- | Like 'query' but possibly returns your externally routable IP address.
323query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP)
324query' addr params = queryK addr params (const (,))
325
326-- | Enqueue a query, but give us the complete BEncoded content sent by the
327-- remote Node. This is useful for handling extensions that this library does
328-- not otherwise support.
329queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs)
330queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
331
332queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
333 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x
334queryK addr params kont = do
335 Manager {..} <- getManager
336 tid <- liftIO $ genTransactionId transactionCounter
337 let queryMethod = method :: Method a b
338 let signature = querySignature (methodName queryMethod) tid addr
339 $(logDebugS) "query.sending" signature
340
341 mres <- liftIO $ do
342 ares <- registerQuery (tid, addr) pendingCalls
343
344#ifdef VERSION_bencoding
345 let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid)
346 qb = encodePayload q :: KMessage
347 qbs = encodeHeaders () qb :: BC.ByteString
348#else
349 let q = Tox.Message (methodName queryMethod) cli tid params
350 cli = error "TODO TOX client node id"
351 ctx = error "TODO TOX ToxCipherContext"
352 qb = encodePayload q :: Tox.Message BC.ByteString
353 qbs = encodeHeaders ctx qb :: BC.ByteString
354#endif
355 sendQuery sock addr qbs
356 `onException` unregisterQuery (tid, addr) pendingCalls
357
358 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
359 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
360 case res of
361#ifdef VERSION_bencoding
362 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
363 Right (R (KResponse {..})) ->
364 case fromBEncode respVals of
365 Right r -> pure $ kont raw r respIP
366#else
367 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR"
368 Right (Tox.Message {..}) ->
369 case S.decode msgPayload of
370 Right r -> pure $ kont raw r Nothing
371#endif
372 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
373
374 case mres of
375 Just res -> do
376 $(logDebugS) "query.responded" $ signature
377 return res
378
379 Nothing -> do
380 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
381 $(logWarnS) "query.not_responding" $ signature <> " for " <>
382 T.pack (show (optQueryTimeout options)) <> " seconds"
383 throw $ TimeoutExpired
384
385{-----------------------------------------------------------------------
386-- Handlers
387-----------------------------------------------------------------------}
388-- we already throw:
389--
390-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
391--
392-- * ErrorCode(ServerError) in the 'runHandler';
393--
394-- * ErrorCode(GenericError) in the 'runHandler' (those can be
395-- async exception too)
396--
397-- so HandlerFailure should cover *only* 'ProtocolError's.
398
399-- | Used to signal protocol errors.
400data HandlerFailure
401 = BadAddress -- ^ for e.g.: node calls herself;
402 | InvalidParameter Text -- ^ for e.g.: bad session token.
403 deriving (Show, Eq, Typeable)
404
405instance Exception HandlerFailure
406
407prettyHF :: HandlerFailure -> BS.ByteString
408prettyHF BadAddress = T.encodeUtf8 "bad address"
409prettyHF (InvalidParameter reason) = T.encodeUtf8 $
410 "invalid parameter: " <> reason
411
412prettyQF :: QueryFailure -> BS.ByteString
413prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
414 <> T.pack (show e)
415
416-- | Make handler from handler function. Any thrown exception will be
417-- supressed and send over the wire back to the querying node.
418--
419-- If the handler make some 'query' normally it /should/ handle
420-- corresponding 'QueryFailure's.
421--
422handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
423 => (SockAddr -> a -> h b) -> Handler h msg raw
424handler body = (name, wrapper)
425 where
426 Method name = method :: Method a b
427 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
428 wrapper addr args =
429 case decodePayload args of
430 Left e -> pure $ Left e
431 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
432
433runHandler :: MonadKRPC h m
434#ifdef VERSION_bencoding
435 => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult
436#else
437 => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult
438#endif
439runHandler h addr m = Lifted.catches wrapper failbacks
440 where
441 signature = querySignature (queryMethod m) (queryId m) addr
442
443 wrapper = do
444 $(logDebugS) "handler.quered" signature
445#ifdef VERSION_bencoding
446 result <- liftHandler (h addr (Q m))
447#else
448 result <- liftHandler (h addr m)
449#endif
450
451 case result of
452 Left msg -> do
453 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
454#ifdef VERSION_bencoding
455 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
456#else
457 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
458#endif
459
460 Right a -> do -- KQueryArgs
461 $(logDebugS) "handler.success" signature
462 return $ Right a
463
464 failbacks =
465 [ E.Handler $ \ (e :: HandlerFailure) -> do
466 $(logDebugS) "handler.failed" signature
467#ifdef VERSION_bencoding
468 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
469#else
470 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
471#endif
472
473
474 -- may happen if handler makes query and fail
475 , E.Handler $ \ (e :: QueryFailure) -> do
476#ifdef VERSION_bencoding
477 return $ Left $ KError ServerError (prettyQF e) (queryId m)
478#else
479 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
480#endif
481
482 -- since handler thread exit after sendMessage we can safely
483 -- suppress async exception here
484 , E.Handler $ \ (e :: SomeException) -> do
485#ifdef VERSION_bencoding
486 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
487#else
488 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
489#endif
490 ]
491
492dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
493dispatchHandler q addr = do
494 Manager {..} <- getManager
495 case L.lookup (queryMethod q) handlers of
496#ifdef VERSION_bencoding
497 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q)
498#else
499 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
500#endif
501 Just h -> runHandler h addr q
502
503{-----------------------------------------------------------------------
504-- Listener
505-----------------------------------------------------------------------}
506
507-- TODO bound amount of parallel handler *threads*:
508--
509-- peer A flooding with find_node
510-- peer B trying to ping peer C
511-- peer B fork too many threads
512-- ... space leak
513--
514handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m ()
515handleQuery raw q addr = void $ fork $ do
516 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
517 Manager {..} <- getManager
518 res <- dispatchHandler q addr
519#ifdef VERSION_bencoding
520 let res' = either E id res
521 resbe = either toBEncode toBEncode res
522 $(logOther "q") $ T.unlines
523 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
524 , "==>"
525 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
526 ]
527 sendMessage sock addr $ encodeHeaders () res'
528#else
529 -- Errors not sent for Tox.
530 let ctx = error "TODO TOX ToxCipherContext 2"
531 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res
532#endif
533
534handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m ()
535handleResponse raw result addr = do
536 Manager {..} <- getManager
537 liftIO $ do
538#ifdef VERSION_bencoding
539 let resultId = either errorId envelopeTransaction result
540#else
541 let resultId = either Tox.msgNonce Tox.msgNonce result
542#endif
543 mcall <- unregisterQuery (resultId, addr) pendingCalls
544 case mcall of
545 Nothing -> return ()
546 Just ares -> putMVar ares (raw,result)
547
548#ifdef VERSION_bencoding
549handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m ()
550handleMessage raw (Q q) = handleQuery raw q
551handleMessage raw (R r) = handleResponse raw (Right (R r))
552handleMessage raw (E e) = handleResponse raw (Left e)
553#else
554handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m ()
555handleMessage raw q | Tox.isQuery q = handleQuery raw q
556handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r)
557handleMessage raw e | Tox.isError e = handleResponse raw (Left e)
558#endif
559
560listener :: MonadKRPC h m => m ()
561listener = do
562 Manager {..} <- getManager
563 fix $ \again -> do
564 let ctx = error "TODO TOX ToxCipherContext 3"
565 (bs, addr) <- liftIO $ do
566 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
567#ifdef VERSION_bencoding
568 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
569#else
570 case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of
571#endif
572 -- TODO ignore unknown messages at all?
573#ifdef VERSION_bencoding
574 Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage)
575#else
576 Left _ -> return () -- TODO TOX send unknownMessage error
577#endif
578 Right (raw,m) -> handleMessage raw m addr
579 again
580 where
581 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
582 exceptions e
583 -- packets with empty payload may trigger eof exception
584 | isEOFError e = return ("", SockAddrInet 0 0)
585 | otherwise = throwIO e
586
587-- | Should be run before any 'query', otherwise they will never
588-- succeed.
589listen :: MonadKRPC h m => m ()
590listen = do
591 Manager {..} <- getManager
592 tid <- fork $ do
593 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
594 listener `Lifted.finally`
595 liftIO (takeMVar listenerThread)
596 liftIO $ putMVar listenerThread tid