summaryrefslogtreecommitdiff
path: root/src/Network/KRPC.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/KRPC.hs')
-rw-r--r--src/Network/KRPC.hs565
1 files changed, 564 insertions, 1 deletions
diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs
index d185fb4c..22ab81f2 100644
--- a/src/Network/KRPC.hs
+++ b/src/Network/KRPC.hs
@@ -50,6 +50,16 @@
50-- 50--
51-- For protocol details see "Network.KRPC.Message" module. 51-- For protocol details see "Network.KRPC.Message" module.
52-- 52--
53{-# LANGUAGE CPP #-}
54{-# LANGUAGE OverloadedStrings #-}
55{-# LANGUAGE FlexibleInstances #-}
56{-# LANGUAGE FlexibleContexts #-}
57{-# LANGUAGE ScopedTypeVariables #-}
58{-# LANGUAGE DefaultSignatures #-}
59{-# LANGUAGE MultiParamTypeClasses #-}
60{-# LANGUAGE FunctionalDependencies #-}
61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE TemplateHaskell #-}
53module Network.KRPC 63module Network.KRPC
54 ( -- * Methods 64 ( -- * Methods
55 Method 65 Method
@@ -87,5 +97,558 @@ module Network.KRPC
87import Data.Default.Class 97import Data.Default.Class
88import Network.KRPC.Message 98import Network.KRPC.Message
89import Network.KRPC.Method 99import Network.KRPC.Method
90import Network.KRPC.Manager
91import Network.Socket (SockAddr (..)) 100import Network.Socket (SockAddr (..))
101
102import Control.Applicative
103#ifdef THREAD_DEBUG
104import Control.Concurrent.Lifted.Instrument
105#else
106import GHC.Conc (labelThread)
107import Control.Concurrent.Lifted
108#endif
109import Control.Exception hiding (Handler)
110import qualified Control.Exception.Lifted as E (Handler (..))
111import Control.Exception.Lifted as Lifted (catches, finally)
112import Control.Monad
113import Control.Monad.Logger
114import Control.Monad.Reader
115import Control.Monad.Trans.Control
116#ifdef VERSION_bencoding
117import Data.BEncode as BE
118import Data.BEncode.Internal as BE
119import Data.BEncode.Pretty (showBEncode)
120#else
121import qualified Data.Tox as Tox
122#endif
123import qualified Data.ByteString.Base16 as Base16
124import Data.ByteString as BS
125import Data.ByteString.Char8 as BC
126import Data.ByteString.Lazy as BL
127import Data.Default.Class
128import Data.IORef
129import Data.List as L
130import Data.Map as M
131import Data.Monoid
132import Data.Serialize as S
133import Data.Text as T
134import Data.Text.Encoding as T
135import Data.Tuple
136import Data.Typeable
137import Network.RPC
138import Network.KRPC.Message
139import Network.KRPC.Method hiding (Envelope)
140import qualified Network.KRPC.Method as KRPC (Envelope)
141import Network.Socket hiding (listen)
142import Network.Socket.ByteString as BS
143import System.IO.Error
144import System.Timeout
145#ifdef VERSION_bencoding
146import Network.DHT.Mainline
147#endif
148
149
150{-----------------------------------------------------------------------
151-- Options
152-----------------------------------------------------------------------}
153
154-- | RPC manager options.
155data Options = Options
156 { -- | Initial 'TransactionId' incremented with each 'query';
157 optSeedTransaction :: {-# UNPACK #-} !Int
158
159 -- | Time to wait for response from remote node, in seconds.
160 , optQueryTimeout :: {-# UNPACK #-} !Int
161
162 -- | Maximum number of bytes to receive.
163 , optMaxMsgSize :: {-# UNPACK #-} !Int
164 } deriving (Show, Eq)
165
166defaultSeedTransaction :: Int
167defaultSeedTransaction = 0
168
169defaultQueryTimeout :: Int
170defaultQueryTimeout = 120
171
172defaultMaxMsgSize :: Int
173defaultMaxMsgSize = 64 * 1024
174
175-- | Permissive defaults.
176instance Default Options where
177 def = Options
178 { optSeedTransaction = defaultSeedTransaction
179 , optQueryTimeout = defaultQueryTimeout
180 , optMaxMsgSize = defaultMaxMsgSize
181 }
182
183validateOptions :: Options -> IO ()
184validateOptions Options {..}
185 | optQueryTimeout < 1
186 = throwIO (userError "krpc: non-positive query timeout")
187 | optMaxMsgSize < 1
188 = throwIO (userError "krpc: non-positive buffer size")
189 | otherwise = return ()
190
191{-----------------------------------------------------------------------
192-- Options
193-----------------------------------------------------------------------}
194
195type KResult = Either KError KMessage -- Response
196
197type TransactionCounter = IORef Int
198type CallId = (TransactionId, SockAddr)
199type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response)
200type PendingCalls = IORef (Map CallId CallRes)
201
202type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
203
204-- | Handler is a function which will be invoked then some /remote/
205-- node querying /this/ node.
206type Handler h msg v = (MethodName, HandlerBody h msg v)
207
208-- | Keep track pending queries made by /this/ node and handle queries
209-- made by /remote/ nodes.
210data Manager h = Manager
211 { sock :: !Socket
212 , options :: !Options
213 , listenerThread :: !(MVar ThreadId)
214 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
215 , pendingCalls :: {-# UNPACK #-} !PendingCalls
216#ifdef VERSION_bencoding
217 , handlers :: [Handler h KMessageOf BValue]
218#else
219 , handlers :: [Handler h KMessageOf BC.ByteString]
220#endif
221 }
222
223-- | A monad which can perform or handle queries.
224class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
225 => MonadKRPC h m | m -> h where
226
227 -- | Ask for manager.
228 getManager :: m (Manager h)
229
230 default getManager :: MonadReader (Manager h) m => m (Manager h)
231 getManager = ask
232
233 -- | Can be used to add logging for instance.
234 liftHandler :: h a -> m a
235
236 default liftHandler :: m a -> m a
237 liftHandler = id
238
239instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
240 => MonadKRPC h (ReaderT (Manager h) h) where
241
242 liftHandler = lift
243
244sockAddrFamily :: SockAddr -> Family
245sockAddrFamily (SockAddrInet _ _ ) = AF_INET
246sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
247sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
248sockAddrFamily (SockAddrCan _ ) = AF_CAN
249
250-- | Bind socket to the specified address. To enable query handling
251-- run 'listen'.
252newManager :: Options -- ^ various protocol options;
253 -> SockAddr -- ^ address to listen on;
254#ifdef VERSION_bencoding
255 -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries.
256#else
257 -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries.
258#endif
259 -> IO (Manager h) -- ^ new rpc manager.
260newManager opts @ Options {..} servAddr handlers = do
261 validateOptions opts
262 sock <- bindServ
263 tref <- newEmptyMVar
264 tran <- newIORef optSeedTransaction
265 calls <- newIORef M.empty
266 return $ Manager sock opts tref tran calls handlers
267 where
268 bindServ = do
269 let family = sockAddrFamily servAddr
270 sock <- socket family Datagram defaultProtocol
271 when (family == AF_INET6) $ do
272 setSocketOption sock IPv6Only 0
273 bindSocket sock servAddr
274 return sock
275
276-- | Unblock all pending calls and close socket.
277closeManager :: Manager m -> IO ()
278closeManager Manager {..} = do
279 maybe (return ()) killThread =<< tryTakeMVar listenerThread
280 -- TODO unblock calls
281 close sock
282
283-- | Check if the manager is still active. Manager becomes active
284-- until 'closeManager' called.
285isActive :: Manager m -> IO Bool
286isActive Manager {..} = liftIO $ isBound sock
287{-# INLINE isActive #-}
288
289-- | Normally you should use Control.Monad.Trans.Resource.allocate
290-- function.
291#ifdef VERSION_bencoding
292withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue]
293#else
294withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString]
295#endif
296 -> (Manager h -> IO a) -> IO a
297withManager opts addr hs = bracket (newManager opts addr hs) closeManager
298
299{-----------------------------------------------------------------------
300-- Logging
301-----------------------------------------------------------------------}
302
303-- TODO prettify log messages
304querySignature :: MethodName -> TransactionId -> SockAddr -> Text
305querySignature name transaction addr = T.concat
306#ifdef VERSION_bencoding
307 [ "&", T.decodeUtf8 name
308 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
309#else
310 [ "&", T.pack (show name)
311 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
312#endif
313 , " @", T.pack (show addr)
314 ]
315
316{-----------------------------------------------------------------------
317-- Client
318-----------------------------------------------------------------------}
319-- we don't need to know about TransactionId while performing query,
320-- so we introduce QueryFailure exceptions
321
322-- | Used to signal 'query' errors.
323data QueryFailure
324 = SendFailed -- ^ unable to send query;
325 | QueryFailed ErrorCode Text -- ^ remote node return error;
326 | TimeoutExpired -- ^ remote node not responding.
327 deriving (Show, Eq, Typeable)
328
329instance Exception QueryFailure
330
331sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
332sendMessage sock addr a = do
333 liftIO $ sendManyTo sock [a] addr
334
335genTransactionId :: TransactionCounter -> IO TransactionId
336genTransactionId ref = do
337 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
338#ifdef VERSION_bencoding
339 return $ BC.pack (show cur)
340#else
341 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
342#endif
343
344-- | How many times 'query' call have been performed.
345getQueryCount :: MonadKRPC h m => m Int
346getQueryCount = do
347 Manager {..} <- getManager
348 curTrans <- liftIO $ readIORef transactionCounter
349 return $ curTrans - optSeedTransaction options
350
351registerQuery :: CallId -> PendingCalls -> IO CallRes
352registerQuery cid ref = do
353 ares <- newEmptyMVar
354 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
355 return ares
356
357-- simultaneous M.lookup and M.delete guarantees that we never get two
358-- or more responses to the same query
359unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
360unregisterQuery cid ref = do
361 atomicModifyIORef' ref $ swap .
362 M.updateLookupWithKey (const (const Nothing)) cid
363
364
365-- (sendmsg EINVAL)
366sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO ()
367sendQuery sock addr q = handle sockError $ sendMessage sock addr q
368 where
369 sockError :: IOError -> IO ()
370 sockError _ = throwIO SendFailed
371
372-- | Enqueue query to the given node.
373--
374-- This function should throw 'QueryFailure' exception if quered node
375-- respond with @error@ message or the query timeout expires.
376--
377query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
378query addr params = queryK addr params (\_ x _ -> x)
379
380-- | Like 'query' but possibly returns your externally routable IP address.
381query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP)
382query' addr params = queryK addr params (const (,))
383
384-- | Enqueue a query, but give us the complete BEncoded content sent by the
385-- remote Node. This is useful for handling extensions that this library does
386-- not otherwise support.
387queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs)
388queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
389
390queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
391 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x
392queryK addr params kont = do
393 Manager {..} <- getManager
394 tid <- liftIO $ genTransactionId transactionCounter
395 let queryMethod = method :: Method a b
396 let signature = querySignature (methodName queryMethod) tid addr
397 $(logDebugS) "query.sending" signature
398
399 mres <- liftIO $ do
400 ares <- registerQuery (tid, addr) pendingCalls
401
402#ifdef VERSION_bencoding
403 let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid)
404 qb = encodePayload q :: KMessage
405 qbs = encodeHeaders () qb :: BC.ByteString
406#else
407 let q = Tox.Message (methodName queryMethod) cli tid params
408 cli = error "TODO TOX client node id"
409 ctx = error "TODO TOX ToxCipherContext"
410 qb = encodePayload q :: Tox.Message BC.ByteString
411 qbs = encodeHeaders ctx qb :: BC.ByteString
412#endif
413 sendQuery sock addr qbs
414 `onException` unregisterQuery (tid, addr) pendingCalls
415
416 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
417 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
418 case res of
419#ifdef VERSION_bencoding
420 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
421 Right (R (KResponse {..})) ->
422 case fromBEncode respVals of
423 Right r -> pure $ kont raw r respIP
424#else
425 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR"
426 Right (Tox.Message {..}) ->
427 case S.decode msgPayload of
428 Right r -> pure $ kont raw r Nothing
429#endif
430 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
431
432 case mres of
433 Just res -> do
434 $(logDebugS) "query.responded" $ signature
435 return res
436
437 Nothing -> do
438 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
439 $(logWarnS) "query.not_responding" $ signature <> " for " <>
440 T.pack (show (optQueryTimeout options)) <> " seconds"
441 throw $ TimeoutExpired
442
443{-----------------------------------------------------------------------
444-- Handlers
445-----------------------------------------------------------------------}
446-- we already throw:
447--
448-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
449--
450-- * ErrorCode(ServerError) in the 'runHandler';
451--
452-- * ErrorCode(GenericError) in the 'runHandler' (those can be
453-- async exception too)
454--
455-- so HandlerFailure should cover *only* 'ProtocolError's.
456
457-- | Used to signal protocol errors.
458data HandlerFailure
459 = BadAddress -- ^ for e.g.: node calls herself;
460 | InvalidParameter Text -- ^ for e.g.: bad session token.
461 deriving (Show, Eq, Typeable)
462
463instance Exception HandlerFailure
464
465prettyHF :: HandlerFailure -> BS.ByteString
466prettyHF BadAddress = T.encodeUtf8 "bad address"
467prettyHF (InvalidParameter reason) = T.encodeUtf8 $
468 "invalid parameter: " <> reason
469
470prettyQF :: QueryFailure -> BS.ByteString
471prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
472 <> T.pack (show e)
473
474-- | Make handler from handler function. Any thrown exception will be
475-- supressed and send over the wire back to the querying node.
476--
477-- If the handler make some 'query' normally it /should/ handle
478-- corresponding 'QueryFailure's.
479--
480handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
481 => (SockAddr -> a -> h b) -> Handler h msg raw
482handler body = (name, wrapper)
483 where
484 Method name = method :: Method a b
485 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
486 wrapper addr args =
487 case decodePayload args of
488 Left e -> pure $ Left e
489 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
490
491runHandler :: MonadKRPC h m
492#ifdef VERSION_bencoding
493 => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult
494#else
495 => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult
496#endif
497runHandler h addr m = Lifted.catches wrapper failbacks
498 where
499 signature = querySignature (queryMethod m) (queryId m) addr
500
501 wrapper = do
502 $(logDebugS) "handler.quered" signature
503#ifdef VERSION_bencoding
504 result <- liftHandler (h addr (Q m))
505#else
506 result <- liftHandler (h addr m)
507#endif
508
509 case result of
510 Left msg -> do
511 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
512#ifdef VERSION_bencoding
513 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
514#else
515 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
516#endif
517
518 Right a -> do -- KQueryArgs
519 $(logDebugS) "handler.success" signature
520 return $ Right a
521
522 failbacks =
523 [ E.Handler $ \ (e :: HandlerFailure) -> do
524 $(logDebugS) "handler.failed" signature
525#ifdef VERSION_bencoding
526 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
527#else
528 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
529#endif
530
531
532 -- may happen if handler makes query and fail
533 , E.Handler $ \ (e :: QueryFailure) -> do
534#ifdef VERSION_bencoding
535 return $ Left $ KError ServerError (prettyQF e) (queryId m)
536#else
537 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
538#endif
539
540 -- since handler thread exit after sendMessage we can safely
541 -- suppress async exception here
542 , E.Handler $ \ (e :: SomeException) -> do
543#ifdef VERSION_bencoding
544 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
545#else
546 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
547#endif
548 ]
549
550dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
551dispatchHandler q addr = do
552 Manager {..} <- getManager
553 case L.lookup (queryMethod q) handlers of
554#ifdef VERSION_bencoding
555 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q)
556#else
557 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
558#endif
559 Just h -> runHandler h addr q
560
561{-----------------------------------------------------------------------
562-- Listener
563-----------------------------------------------------------------------}
564
565-- TODO bound amount of parallel handler *threads*:
566--
567-- peer A flooding with find_node
568-- peer B trying to ping peer C
569-- peer B fork too many threads
570-- ... space leak
571--
572handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m ()
573handleQuery raw q addr = void $ fork $ do
574 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
575 Manager {..} <- getManager
576 res <- dispatchHandler q addr
577#ifdef VERSION_bencoding
578 let res' = either E id res
579 resbe = either toBEncode toBEncode res
580 $(logOther "q") $ T.unlines
581 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
582 , "==>"
583 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
584 ]
585 sendMessage sock addr $ encodeHeaders () res'
586#else
587 -- Errors not sent for Tox.
588 let ctx = error "TODO TOX ToxCipherContext 2"
589 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res
590#endif
591
592handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m ()
593handleResponse raw result addr = do
594 Manager {..} <- getManager
595 liftIO $ do
596#ifdef VERSION_bencoding
597 let resultId = either errorId envelopeTransaction result
598#else
599 let resultId = either Tox.msgNonce Tox.msgNonce result
600#endif
601 mcall <- unregisterQuery (resultId, addr) pendingCalls
602 case mcall of
603 Nothing -> return ()
604 Just ares -> putMVar ares (raw,result)
605
606#ifdef VERSION_bencoding
607handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m ()
608handleMessage raw (Q q) = handleQuery raw q
609handleMessage raw (R r) = handleResponse raw (Right (R r))
610handleMessage raw (E e) = handleResponse raw (Left e)
611#else
612handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m ()
613handleMessage raw q | Tox.isQuery q = handleQuery raw q
614handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r)
615handleMessage raw e | Tox.isError e = handleResponse raw (Left e)
616#endif
617
618listener :: MonadKRPC h m => m ()
619listener = do
620 Manager {..} <- getManager
621 fix $ \again -> do
622 let ctx = error "TODO TOX ToxCipherContext 3"
623 (bs, addr) <- liftIO $ do
624 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
625#ifdef VERSION_bencoding
626 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
627#else
628 case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of
629#endif
630 -- TODO ignore unknown messages at all?
631#ifdef VERSION_bencoding
632 Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage)
633#else
634 Left _ -> return () -- TODO TOX send unknownMessage error
635#endif
636 Right (raw,m) -> handleMessage raw m addr
637 again
638 where
639 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
640 exceptions e
641 -- packets with empty payload may trigger eof exception
642 | isEOFError e = return ("", SockAddrInet 0 0)
643 | otherwise = throwIO e
644
645-- | Should be run before any 'query', otherwise they will never
646-- succeed.
647listen :: MonadKRPC h m => m ()
648listen = do
649 Manager {..} <- getManager
650 tid <- fork $ do
651 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
652 listener `Lifted.finally`
653 liftIO (takeMVar listenerThread)
654 liftIO $ putMVar listenerThread tid