summaryrefslogtreecommitdiff
path: root/src/Network/KRPC
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-08 13:03:00 -0400
committerjoe <joe@jerkface.net>2017-06-08 13:03:00 -0400
commit33db41f8f2c80172aaf6914bd2900b564bb8b82a (patch)
tree34b3808ef0d8e702dbdacf4e9ad22d1520f487d1 /src/Network/KRPC
parent8c33deac14ca92ef67afc7fbcd3f67bc19317f88 (diff)
Merged Network.KRPC.Manager into Network.KRPC.
Diffstat (limited to 'src/Network/KRPC')
-rw-r--r--src/Network/KRPC/Manager.hs596
1 files changed, 0 insertions, 596 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
deleted file mode 100644
index efd59f32..00000000
--- a/src/Network/KRPC/Manager.hs
+++ /dev/null
@@ -1,596 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Normally, you don't need to import this module.
9--
10{-# LANGUAGE CPP #-}
11{-# LANGUAGE OverloadedStrings #-}
12{-# LANGUAGE FlexibleInstances #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE DefaultSignatures #-}
16{-# LANGUAGE MultiParamTypeClasses #-}
17{-# LANGUAGE FunctionalDependencies #-}
18{-# LANGUAGE DeriveDataTypeable #-}
19{-# LANGUAGE TemplateHaskell #-}
20module Network.KRPC.Manager
21 ( -- * Manager
22 MonadKRPC (..)
23 , Options (..)
24 , Manager
25 , newManager
26 , closeManager
27 , withManager
28 , isActive
29 , listen
30
31 -- * Queries
32 , QueryFailure (..)
33 , query
34 , query'
35 , queryRaw
36 , getQueryCount
37
38 -- * Handlers
39 , HandlerFailure (..)
40 , Handler
41 , handler
42 ) where
43
44import Control.Applicative
45#ifdef THREAD_DEBUG
46import Control.Concurrent.Lifted.Instrument
47#else
48import GHC.Conc (labelThread)
49import Control.Concurrent.Lifted
50#endif
51import Control.Exception hiding (Handler)
52import qualified Control.Exception.Lifted as E (Handler (..))
53import Control.Exception.Lifted as Lifted (catches, finally)
54import Control.Monad
55import Control.Monad.Logger
56import Control.Monad.Reader
57import Control.Monad.Trans.Control
58#ifdef VERSION_bencoding
59import Data.BEncode as BE
60import Data.BEncode.Internal as BE
61import Data.BEncode.Pretty (showBEncode)
62#else
63import qualified Data.Tox as Tox
64#endif
65import qualified Data.ByteString.Base16 as Base16
66import Data.ByteString as BS
67import Data.ByteString.Char8 as BC
68import Data.ByteString.Lazy as BL
69import Data.Default.Class
70import Data.IORef
71import Data.List as L
72import Data.Map as M
73import Data.Monoid
74import Data.Serialize as S
75import Data.Text as T
76import Data.Text.Encoding as T
77import Data.Tuple
78import Data.Typeable
79import Network.RPC
80import Network.KRPC.Message
81import Network.KRPC.Method hiding (Envelope)
82import qualified Network.KRPC.Method as KRPC (Envelope)
83import Network.Socket hiding (listen)
84import Network.Socket.ByteString as BS
85import System.IO.Error
86import System.Timeout
87#ifdef VERSION_bencoding
88import Network.DHT.Mainline
89#endif
90
91
92{-----------------------------------------------------------------------
93-- Options
94-----------------------------------------------------------------------}
95
96-- | RPC manager options.
97data Options = Options
98 { -- | Initial 'TransactionId' incremented with each 'query';
99 optSeedTransaction :: {-# UNPACK #-} !Int
100
101 -- | Time to wait for response from remote node, in seconds.
102 , optQueryTimeout :: {-# UNPACK #-} !Int
103
104 -- | Maximum number of bytes to receive.
105 , optMaxMsgSize :: {-# UNPACK #-} !Int
106 } deriving (Show, Eq)
107
108defaultSeedTransaction :: Int
109defaultSeedTransaction = 0
110
111defaultQueryTimeout :: Int
112defaultQueryTimeout = 120
113
114defaultMaxMsgSize :: Int
115defaultMaxMsgSize = 64 * 1024
116
117-- | Permissive defaults.
118instance Default Options where
119 def = Options
120 { optSeedTransaction = defaultSeedTransaction
121 , optQueryTimeout = defaultQueryTimeout
122 , optMaxMsgSize = defaultMaxMsgSize
123 }
124
125validateOptions :: Options -> IO ()
126validateOptions Options {..}
127 | optQueryTimeout < 1
128 = throwIO (userError "krpc: non-positive query timeout")
129 | optMaxMsgSize < 1
130 = throwIO (userError "krpc: non-positive buffer size")
131 | otherwise = return ()
132
133{-----------------------------------------------------------------------
134-- Options
135-----------------------------------------------------------------------}
136
137type KResult = Either KError KMessage -- Response
138
139type TransactionCounter = IORef Int
140type CallId = (TransactionId, SockAddr)
141type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response)
142type PendingCalls = IORef (Map CallId CallRes)
143
144type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
145
146-- | Handler is a function which will be invoked then some /remote/
147-- node querying /this/ node.
148type Handler h msg v = (MethodName, HandlerBody h msg v)
149
150-- | Keep track pending queries made by /this/ node and handle queries
151-- made by /remote/ nodes.
152data Manager h = Manager
153 { sock :: !Socket
154 , options :: !Options
155 , listenerThread :: !(MVar ThreadId)
156 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
157 , pendingCalls :: {-# UNPACK #-} !PendingCalls
158#ifdef VERSION_bencoding
159 , handlers :: [Handler h KMessageOf BValue]
160#else
161 , handlers :: [Handler h KMessageOf BC.ByteString]
162#endif
163 }
164
165-- | A monad which can perform or handle queries.
166class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
167 => MonadKRPC h m | m -> h where
168
169 -- | Ask for manager.
170 getManager :: m (Manager h)
171
172 default getManager :: MonadReader (Manager h) m => m (Manager h)
173 getManager = ask
174
175 -- | Can be used to add logging for instance.
176 liftHandler :: h a -> m a
177
178 default liftHandler :: m a -> m a
179 liftHandler = id
180
181instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
182 => MonadKRPC h (ReaderT (Manager h) h) where
183
184 liftHandler = lift
185
186sockAddrFamily :: SockAddr -> Family
187sockAddrFamily (SockAddrInet _ _ ) = AF_INET
188sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
189sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
190sockAddrFamily (SockAddrCan _ ) = AF_CAN
191
192-- | Bind socket to the specified address. To enable query handling
193-- run 'listen'.
194newManager :: Options -- ^ various protocol options;
195 -> SockAddr -- ^ address to listen on;
196#ifdef VERSION_bencoding
197 -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries.
198#else
199 -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries.
200#endif
201 -> IO (Manager h) -- ^ new rpc manager.
202newManager opts @ Options {..} servAddr handlers = do
203 validateOptions opts
204 sock <- bindServ
205 tref <- newEmptyMVar
206 tran <- newIORef optSeedTransaction
207 calls <- newIORef M.empty
208 return $ Manager sock opts tref tran calls handlers
209 where
210 bindServ = do
211 let family = sockAddrFamily servAddr
212 sock <- socket family Datagram defaultProtocol
213 when (family == AF_INET6) $ do
214 setSocketOption sock IPv6Only 0
215 bindSocket sock servAddr
216 return sock
217
218-- | Unblock all pending calls and close socket.
219closeManager :: Manager m -> IO ()
220closeManager Manager {..} = do
221 maybe (return ()) killThread =<< tryTakeMVar listenerThread
222 -- TODO unblock calls
223 close sock
224
225-- | Check if the manager is still active. Manager becomes active
226-- until 'closeManager' called.
227isActive :: Manager m -> IO Bool
228isActive Manager {..} = liftIO $ isBound sock
229{-# INLINE isActive #-}
230
231-- | Normally you should use Control.Monad.Trans.Resource.allocate
232-- function.
233#ifdef VERSION_bencoding
234withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue]
235#else
236withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString]
237#endif
238 -> (Manager h -> IO a) -> IO a
239withManager opts addr hs = bracket (newManager opts addr hs) closeManager
240
241{-----------------------------------------------------------------------
242-- Logging
243-----------------------------------------------------------------------}
244
245-- TODO prettify log messages
246querySignature :: MethodName -> TransactionId -> SockAddr -> Text
247querySignature name transaction addr = T.concat
248#ifdef VERSION_bencoding
249 [ "&", T.decodeUtf8 name
250 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
251#else
252 [ "&", T.pack (show name)
253 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
254#endif
255 , " @", T.pack (show addr)
256 ]
257
258{-----------------------------------------------------------------------
259-- Client
260-----------------------------------------------------------------------}
261-- we don't need to know about TransactionId while performing query,
262-- so we introduce QueryFailure exceptions
263
264-- | Used to signal 'query' errors.
265data QueryFailure
266 = SendFailed -- ^ unable to send query;
267 | QueryFailed ErrorCode Text -- ^ remote node return error;
268 | TimeoutExpired -- ^ remote node not responding.
269 deriving (Show, Eq, Typeable)
270
271instance Exception QueryFailure
272
273sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
274sendMessage sock addr a = do
275 liftIO $ sendManyTo sock [a] addr
276
277genTransactionId :: TransactionCounter -> IO TransactionId
278genTransactionId ref = do
279 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
280#ifdef VERSION_bencoding
281 return $ BC.pack (show cur)
282#else
283 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
284#endif
285
286-- | How many times 'query' call have been performed.
287getQueryCount :: MonadKRPC h m => m Int
288getQueryCount = do
289 Manager {..} <- getManager
290 curTrans <- liftIO $ readIORef transactionCounter
291 return $ curTrans - optSeedTransaction options
292
293registerQuery :: CallId -> PendingCalls -> IO CallRes
294registerQuery cid ref = do
295 ares <- newEmptyMVar
296 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
297 return ares
298
299-- simultaneous M.lookup and M.delete guarantees that we never get two
300-- or more responses to the same query
301unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
302unregisterQuery cid ref = do
303 atomicModifyIORef' ref $ swap .
304 M.updateLookupWithKey (const (const Nothing)) cid
305
306
307-- (sendmsg EINVAL)
308sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO ()
309sendQuery sock addr q = handle sockError $ sendMessage sock addr q
310 where
311 sockError :: IOError -> IO ()
312 sockError _ = throwIO SendFailed
313
314-- | Enqueue query to the given node.
315--
316-- This function should throw 'QueryFailure' exception if quered node
317-- respond with @error@ message or the query timeout expires.
318--
319query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
320query addr params = queryK addr params (\_ x _ -> x)
321
322-- | Like 'query' but possibly returns your externally routable IP address.
323query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP)
324query' addr params = queryK addr params (const (,))
325
326-- | Enqueue a query, but give us the complete BEncoded content sent by the
327-- remote Node. This is useful for handling extensions that this library does
328-- not otherwise support.
329queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs)
330queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
331
332queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
333 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x
334queryK addr params kont = do
335 Manager {..} <- getManager
336 tid <- liftIO $ genTransactionId transactionCounter
337 let queryMethod = method :: Method a b
338 let signature = querySignature (methodName queryMethod) tid addr
339 $(logDebugS) "query.sending" signature
340
341 mres <- liftIO $ do
342 ares <- registerQuery (tid, addr) pendingCalls
343
344#ifdef VERSION_bencoding
345 let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid)
346 qb = encodePayload q :: KMessage
347 qbs = encodeHeaders () qb :: BC.ByteString
348#else
349 let q = Tox.Message (methodName queryMethod) cli tid params
350 cli = error "TODO TOX client node id"
351 ctx = error "TODO TOX ToxCipherContext"
352 qb = encodePayload q :: Tox.Message BC.ByteString
353 qbs = encodeHeaders ctx qb :: BC.ByteString
354#endif
355 sendQuery sock addr qbs
356 `onException` unregisterQuery (tid, addr) pendingCalls
357
358 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
359 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
360 case res of
361#ifdef VERSION_bencoding
362 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
363 Right (R (KResponse {..})) ->
364 case fromBEncode respVals of
365 Right r -> pure $ kont raw r respIP
366#else
367 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR"
368 Right (Tox.Message {..}) ->
369 case S.decode msgPayload of
370 Right r -> pure $ kont raw r Nothing
371#endif
372 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
373
374 case mres of
375 Just res -> do
376 $(logDebugS) "query.responded" $ signature
377 return res
378
379 Nothing -> do
380 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
381 $(logWarnS) "query.not_responding" $ signature <> " for " <>
382 T.pack (show (optQueryTimeout options)) <> " seconds"
383 throw $ TimeoutExpired
384
385{-----------------------------------------------------------------------
386-- Handlers
387-----------------------------------------------------------------------}
388-- we already throw:
389--
390-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
391--
392-- * ErrorCode(ServerError) in the 'runHandler';
393--
394-- * ErrorCode(GenericError) in the 'runHandler' (those can be
395-- async exception too)
396--
397-- so HandlerFailure should cover *only* 'ProtocolError's.
398
399-- | Used to signal protocol errors.
400data HandlerFailure
401 = BadAddress -- ^ for e.g.: node calls herself;
402 | InvalidParameter Text -- ^ for e.g.: bad session token.
403 deriving (Show, Eq, Typeable)
404
405instance Exception HandlerFailure
406
407prettyHF :: HandlerFailure -> BS.ByteString
408prettyHF BadAddress = T.encodeUtf8 "bad address"
409prettyHF (InvalidParameter reason) = T.encodeUtf8 $
410 "invalid parameter: " <> reason
411
412prettyQF :: QueryFailure -> BS.ByteString
413prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
414 <> T.pack (show e)
415
416-- | Make handler from handler function. Any thrown exception will be
417-- supressed and send over the wire back to the querying node.
418--
419-- If the handler make some 'query' normally it /should/ handle
420-- corresponding 'QueryFailure's.
421--
422handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
423 => (SockAddr -> a -> h b) -> Handler h msg raw
424handler body = (name, wrapper)
425 where
426 Method name = method :: Method a b
427 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
428 wrapper addr args =
429 case decodePayload args of
430 Left e -> pure $ Left e
431 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
432
433runHandler :: MonadKRPC h m
434#ifdef VERSION_bencoding
435 => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult
436#else
437 => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult
438#endif
439runHandler h addr m = Lifted.catches wrapper failbacks
440 where
441 signature = querySignature (queryMethod m) (queryId m) addr
442
443 wrapper = do
444 $(logDebugS) "handler.quered" signature
445#ifdef VERSION_bencoding
446 result <- liftHandler (h addr (Q m))
447#else
448 result <- liftHandler (h addr m)
449#endif
450
451 case result of
452 Left msg -> do
453 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
454#ifdef VERSION_bencoding
455 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
456#else
457 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
458#endif
459
460 Right a -> do -- KQueryArgs
461 $(logDebugS) "handler.success" signature
462 return $ Right a
463
464 failbacks =
465 [ E.Handler $ \ (e :: HandlerFailure) -> do
466 $(logDebugS) "handler.failed" signature
467#ifdef VERSION_bencoding
468 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
469#else
470 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
471#endif
472
473
474 -- may happen if handler makes query and fail
475 , E.Handler $ \ (e :: QueryFailure) -> do
476#ifdef VERSION_bencoding
477 return $ Left $ KError ServerError (prettyQF e) (queryId m)
478#else
479 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
480#endif
481
482 -- since handler thread exit after sendMessage we can safely
483 -- suppress async exception here
484 , E.Handler $ \ (e :: SomeException) -> do
485#ifdef VERSION_bencoding
486 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
487#else
488 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
489#endif
490 ]
491
492dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
493dispatchHandler q addr = do
494 Manager {..} <- getManager
495 case L.lookup (queryMethod q) handlers of
496#ifdef VERSION_bencoding
497 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q)
498#else
499 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
500#endif
501 Just h -> runHandler h addr q
502
503{-----------------------------------------------------------------------
504-- Listener
505-----------------------------------------------------------------------}
506
507-- TODO bound amount of parallel handler *threads*:
508--
509-- peer A flooding with find_node
510-- peer B trying to ping peer C
511-- peer B fork too many threads
512-- ... space leak
513--
514handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m ()
515handleQuery raw q addr = void $ fork $ do
516 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
517 Manager {..} <- getManager
518 res <- dispatchHandler q addr
519#ifdef VERSION_bencoding
520 let res' = either E id res
521 resbe = either toBEncode toBEncode res
522 $(logOther "q") $ T.unlines
523 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
524 , "==>"
525 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
526 ]
527 sendMessage sock addr $ encodeHeaders () res'
528#else
529 -- Errors not sent for Tox.
530 let ctx = error "TODO TOX ToxCipherContext 2"
531 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res
532#endif
533
534handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m ()
535handleResponse raw result addr = do
536 Manager {..} <- getManager
537 liftIO $ do
538#ifdef VERSION_bencoding
539 let resultId = either errorId envelopeTransaction result
540#else
541 let resultId = either Tox.msgNonce Tox.msgNonce result
542#endif
543 mcall <- unregisterQuery (resultId, addr) pendingCalls
544 case mcall of
545 Nothing -> return ()
546 Just ares -> putMVar ares (raw,result)
547
548#ifdef VERSION_bencoding
549handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m ()
550handleMessage raw (Q q) = handleQuery raw q
551handleMessage raw (R r) = handleResponse raw (Right (R r))
552handleMessage raw (E e) = handleResponse raw (Left e)
553#else
554handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m ()
555handleMessage raw q | Tox.isQuery q = handleQuery raw q
556handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r)
557handleMessage raw e | Tox.isError e = handleResponse raw (Left e)
558#endif
559
560listener :: MonadKRPC h m => m ()
561listener = do
562 Manager {..} <- getManager
563 fix $ \again -> do
564 let ctx = error "TODO TOX ToxCipherContext 3"
565 (bs, addr) <- liftIO $ do
566 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
567#ifdef VERSION_bencoding
568 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
569#else
570 case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of
571#endif
572 -- TODO ignore unknown messages at all?
573#ifdef VERSION_bencoding
574 Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage)
575#else
576 Left _ -> return () -- TODO TOX send unknownMessage error
577#endif
578 Right (raw,m) -> handleMessage raw m addr
579 again
580 where
581 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
582 exceptions e
583 -- packets with empty payload may trigger eof exception
584 | isEOFError e = return ("", SockAddrInet 0 0)
585 | otherwise = throwIO e
586
587-- | Should be run before any 'query', otherwise they will never
588-- succeed.
589listen :: MonadKRPC h m => m ()
590listen = do
591 Manager {..} <- getManager
592 tid <- fork $ do
593 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
594 listener `Lifted.finally`
595 liftIO (takeMVar listenerThread)
596 liftIO $ putMVar listenerThread tid