diff options
author | joe <joe@jerkface.net> | 2017-06-08 13:03:00 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-06-08 13:03:00 -0400 |
commit | 33db41f8f2c80172aaf6914bd2900b564bb8b82a (patch) | |
tree | 34b3808ef0d8e702dbdacf4e9ad22d1520f487d1 /src | |
parent | 8c33deac14ca92ef67afc7fbcd3f67bc19317f88 (diff) |
Merged Network.KRPC.Manager into Network.KRPC.
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 2 | ||||
-rw-r--r-- | src/Network/KRPC.hs | 565 | ||||
-rw-r--r-- | src/Network/KRPC/Manager.hs | 596 |
3 files changed, 565 insertions, 598 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 56ea262a..a872fc72 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -83,7 +83,7 @@ import Data.Time.Clock.POSIX | |||
83 | 83 | ||
84 | import Network.KRPC as KRPC hiding (Options, def) | 84 | import Network.KRPC as KRPC hiding (Options, def) |
85 | import Network.KRPC.Message (ReflectedIP(..)) | 85 | import Network.KRPC.Message (ReflectedIP(..)) |
86 | import Network.KRPC.Manager (QueryFailure(..)) | 86 | import Network.KRPC (QueryFailure(..)) |
87 | import Data.Torrent | 87 | import Data.Torrent |
88 | import Network.BitTorrent.DHT.Message | 88 | import Network.BitTorrent.DHT.Message |
89 | import Network.BitTorrent.DHT.Routing as R | 89 | import Network.BitTorrent.DHT.Routing as R |
diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index d185fb4c..22ab81f2 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs | |||
@@ -50,6 +50,16 @@ | |||
50 | -- | 50 | -- |
51 | -- For protocol details see "Network.KRPC.Message" module. | 51 | -- For protocol details see "Network.KRPC.Message" module. |
52 | -- | 52 | -- |
53 | {-# LANGUAGE CPP #-} | ||
54 | {-# LANGUAGE OverloadedStrings #-} | ||
55 | {-# LANGUAGE FlexibleInstances #-} | ||
56 | {-# LANGUAGE FlexibleContexts #-} | ||
57 | {-# LANGUAGE ScopedTypeVariables #-} | ||
58 | {-# LANGUAGE DefaultSignatures #-} | ||
59 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
60 | {-# LANGUAGE FunctionalDependencies #-} | ||
61 | {-# LANGUAGE DeriveDataTypeable #-} | ||
62 | {-# LANGUAGE TemplateHaskell #-} | ||
53 | module Network.KRPC | 63 | module Network.KRPC |
54 | ( -- * Methods | 64 | ( -- * Methods |
55 | Method | 65 | Method |
@@ -87,5 +97,558 @@ module Network.KRPC | |||
87 | import Data.Default.Class | 97 | import Data.Default.Class |
88 | import Network.KRPC.Message | 98 | import Network.KRPC.Message |
89 | import Network.KRPC.Method | 99 | import Network.KRPC.Method |
90 | import Network.KRPC.Manager | ||
91 | import Network.Socket (SockAddr (..)) | 100 | import Network.Socket (SockAddr (..)) |
101 | |||
102 | import Control.Applicative | ||
103 | #ifdef THREAD_DEBUG | ||
104 | import Control.Concurrent.Lifted.Instrument | ||
105 | #else | ||
106 | import GHC.Conc (labelThread) | ||
107 | import Control.Concurrent.Lifted | ||
108 | #endif | ||
109 | import Control.Exception hiding (Handler) | ||
110 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
111 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
112 | import Control.Monad | ||
113 | import Control.Monad.Logger | ||
114 | import Control.Monad.Reader | ||
115 | import Control.Monad.Trans.Control | ||
116 | #ifdef VERSION_bencoding | ||
117 | import Data.BEncode as BE | ||
118 | import Data.BEncode.Internal as BE | ||
119 | import Data.BEncode.Pretty (showBEncode) | ||
120 | #else | ||
121 | import qualified Data.Tox as Tox | ||
122 | #endif | ||
123 | import qualified Data.ByteString.Base16 as Base16 | ||
124 | import Data.ByteString as BS | ||
125 | import Data.ByteString.Char8 as BC | ||
126 | import Data.ByteString.Lazy as BL | ||
127 | import Data.Default.Class | ||
128 | import Data.IORef | ||
129 | import Data.List as L | ||
130 | import Data.Map as M | ||
131 | import Data.Monoid | ||
132 | import Data.Serialize as S | ||
133 | import Data.Text as T | ||
134 | import Data.Text.Encoding as T | ||
135 | import Data.Tuple | ||
136 | import Data.Typeable | ||
137 | import Network.RPC | ||
138 | import Network.KRPC.Message | ||
139 | import Network.KRPC.Method hiding (Envelope) | ||
140 | import qualified Network.KRPC.Method as KRPC (Envelope) | ||
141 | import Network.Socket hiding (listen) | ||
142 | import Network.Socket.ByteString as BS | ||
143 | import System.IO.Error | ||
144 | import System.Timeout | ||
145 | #ifdef VERSION_bencoding | ||
146 | import Network.DHT.Mainline | ||
147 | #endif | ||
148 | |||
149 | |||
150 | {----------------------------------------------------------------------- | ||
151 | -- Options | ||
152 | -----------------------------------------------------------------------} | ||
153 | |||
154 | -- | RPC manager options. | ||
155 | data Options = Options | ||
156 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
157 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
158 | |||
159 | -- | Time to wait for response from remote node, in seconds. | ||
160 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
161 | |||
162 | -- | Maximum number of bytes to receive. | ||
163 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
164 | } deriving (Show, Eq) | ||
165 | |||
166 | defaultSeedTransaction :: Int | ||
167 | defaultSeedTransaction = 0 | ||
168 | |||
169 | defaultQueryTimeout :: Int | ||
170 | defaultQueryTimeout = 120 | ||
171 | |||
172 | defaultMaxMsgSize :: Int | ||
173 | defaultMaxMsgSize = 64 * 1024 | ||
174 | |||
175 | -- | Permissive defaults. | ||
176 | instance Default Options where | ||
177 | def = Options | ||
178 | { optSeedTransaction = defaultSeedTransaction | ||
179 | , optQueryTimeout = defaultQueryTimeout | ||
180 | , optMaxMsgSize = defaultMaxMsgSize | ||
181 | } | ||
182 | |||
183 | validateOptions :: Options -> IO () | ||
184 | validateOptions Options {..} | ||
185 | | optQueryTimeout < 1 | ||
186 | = throwIO (userError "krpc: non-positive query timeout") | ||
187 | | optMaxMsgSize < 1 | ||
188 | = throwIO (userError "krpc: non-positive buffer size") | ||
189 | | otherwise = return () | ||
190 | |||
191 | {----------------------------------------------------------------------- | ||
192 | -- Options | ||
193 | -----------------------------------------------------------------------} | ||
194 | |||
195 | type KResult = Either KError KMessage -- Response | ||
196 | |||
197 | type TransactionCounter = IORef Int | ||
198 | type CallId = (TransactionId, SockAddr) | ||
199 | type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) | ||
200 | type PendingCalls = IORef (Map CallId CallRes) | ||
201 | |||
202 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) | ||
203 | |||
204 | -- | Handler is a function which will be invoked then some /remote/ | ||
205 | -- node querying /this/ node. | ||
206 | type Handler h msg v = (MethodName, HandlerBody h msg v) | ||
207 | |||
208 | -- | Keep track pending queries made by /this/ node and handle queries | ||
209 | -- made by /remote/ nodes. | ||
210 | data Manager h = Manager | ||
211 | { sock :: !Socket | ||
212 | , options :: !Options | ||
213 | , listenerThread :: !(MVar ThreadId) | ||
214 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
215 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | ||
216 | #ifdef VERSION_bencoding | ||
217 | , handlers :: [Handler h KMessageOf BValue] | ||
218 | #else | ||
219 | , handlers :: [Handler h KMessageOf BC.ByteString] | ||
220 | #endif | ||
221 | } | ||
222 | |||
223 | -- | A monad which can perform or handle queries. | ||
224 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | ||
225 | => MonadKRPC h m | m -> h where | ||
226 | |||
227 | -- | Ask for manager. | ||
228 | getManager :: m (Manager h) | ||
229 | |||
230 | default getManager :: MonadReader (Manager h) m => m (Manager h) | ||
231 | getManager = ask | ||
232 | |||
233 | -- | Can be used to add logging for instance. | ||
234 | liftHandler :: h a -> m a | ||
235 | |||
236 | default liftHandler :: m a -> m a | ||
237 | liftHandler = id | ||
238 | |||
239 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | ||
240 | => MonadKRPC h (ReaderT (Manager h) h) where | ||
241 | |||
242 | liftHandler = lift | ||
243 | |||
244 | sockAddrFamily :: SockAddr -> Family | ||
245 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
246 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
247 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
248 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
249 | |||
250 | -- | Bind socket to the specified address. To enable query handling | ||
251 | -- run 'listen'. | ||
252 | newManager :: Options -- ^ various protocol options; | ||
253 | -> SockAddr -- ^ address to listen on; | ||
254 | #ifdef VERSION_bencoding | ||
255 | -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. | ||
256 | #else | ||
257 | -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries. | ||
258 | #endif | ||
259 | -> IO (Manager h) -- ^ new rpc manager. | ||
260 | newManager opts @ Options {..} servAddr handlers = do | ||
261 | validateOptions opts | ||
262 | sock <- bindServ | ||
263 | tref <- newEmptyMVar | ||
264 | tran <- newIORef optSeedTransaction | ||
265 | calls <- newIORef M.empty | ||
266 | return $ Manager sock opts tref tran calls handlers | ||
267 | where | ||
268 | bindServ = do | ||
269 | let family = sockAddrFamily servAddr | ||
270 | sock <- socket family Datagram defaultProtocol | ||
271 | when (family == AF_INET6) $ do | ||
272 | setSocketOption sock IPv6Only 0 | ||
273 | bindSocket sock servAddr | ||
274 | return sock | ||
275 | |||
276 | -- | Unblock all pending calls and close socket. | ||
277 | closeManager :: Manager m -> IO () | ||
278 | closeManager Manager {..} = do | ||
279 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
280 | -- TODO unblock calls | ||
281 | close sock | ||
282 | |||
283 | -- | Check if the manager is still active. Manager becomes active | ||
284 | -- until 'closeManager' called. | ||
285 | isActive :: Manager m -> IO Bool | ||
286 | isActive Manager {..} = liftIO $ isBound sock | ||
287 | {-# INLINE isActive #-} | ||
288 | |||
289 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
290 | -- function. | ||
291 | #ifdef VERSION_bencoding | ||
292 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] | ||
293 | #else | ||
294 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString] | ||
295 | #endif | ||
296 | -> (Manager h -> IO a) -> IO a | ||
297 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
298 | |||
299 | {----------------------------------------------------------------------- | ||
300 | -- Logging | ||
301 | -----------------------------------------------------------------------} | ||
302 | |||
303 | -- TODO prettify log messages | ||
304 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | ||
305 | querySignature name transaction addr = T.concat | ||
306 | #ifdef VERSION_bencoding | ||
307 | [ "&", T.decodeUtf8 name | ||
308 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction | ||
309 | #else | ||
310 | [ "&", T.pack (show name) | ||
311 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | ||
312 | #endif | ||
313 | , " @", T.pack (show addr) | ||
314 | ] | ||
315 | |||
316 | {----------------------------------------------------------------------- | ||
317 | -- Client | ||
318 | -----------------------------------------------------------------------} | ||
319 | -- we don't need to know about TransactionId while performing query, | ||
320 | -- so we introduce QueryFailure exceptions | ||
321 | |||
322 | -- | Used to signal 'query' errors. | ||
323 | data QueryFailure | ||
324 | = SendFailed -- ^ unable to send query; | ||
325 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
326 | | TimeoutExpired -- ^ remote node not responding. | ||
327 | deriving (Show, Eq, Typeable) | ||
328 | |||
329 | instance Exception QueryFailure | ||
330 | |||
331 | sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | ||
332 | sendMessage sock addr a = do | ||
333 | liftIO $ sendManyTo sock [a] addr | ||
334 | |||
335 | genTransactionId :: TransactionCounter -> IO TransactionId | ||
336 | genTransactionId ref = do | ||
337 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
338 | #ifdef VERSION_bencoding | ||
339 | return $ BC.pack (show cur) | ||
340 | #else | ||
341 | return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') | ||
342 | #endif | ||
343 | |||
344 | -- | How many times 'query' call have been performed. | ||
345 | getQueryCount :: MonadKRPC h m => m Int | ||
346 | getQueryCount = do | ||
347 | Manager {..} <- getManager | ||
348 | curTrans <- liftIO $ readIORef transactionCounter | ||
349 | return $ curTrans - optSeedTransaction options | ||
350 | |||
351 | registerQuery :: CallId -> PendingCalls -> IO CallRes | ||
352 | registerQuery cid ref = do | ||
353 | ares <- newEmptyMVar | ||
354 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
355 | return ares | ||
356 | |||
357 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
358 | -- or more responses to the same query | ||
359 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | ||
360 | unregisterQuery cid ref = do | ||
361 | atomicModifyIORef' ref $ swap . | ||
362 | M.updateLookupWithKey (const (const Nothing)) cid | ||
363 | |||
364 | |||
365 | -- (sendmsg EINVAL) | ||
366 | sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () | ||
367 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
368 | where | ||
369 | sockError :: IOError -> IO () | ||
370 | sockError _ = throwIO SendFailed | ||
371 | |||
372 | -- | Enqueue query to the given node. | ||
373 | -- | ||
374 | -- This function should throw 'QueryFailure' exception if quered node | ||
375 | -- respond with @error@ message or the query timeout expires. | ||
376 | -- | ||
377 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | ||
378 | query addr params = queryK addr params (\_ x _ -> x) | ||
379 | |||
380 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
381 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | ||
382 | query' addr params = queryK addr params (const (,)) | ||
383 | |||
384 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
385 | -- remote Node. This is useful for handling extensions that this library does | ||
386 | -- not otherwise support. | ||
387 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) | ||
388 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | ||
389 | |||
390 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | ||
391 | SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x | ||
392 | queryK addr params kont = do | ||
393 | Manager {..} <- getManager | ||
394 | tid <- liftIO $ genTransactionId transactionCounter | ||
395 | let queryMethod = method :: Method a b | ||
396 | let signature = querySignature (methodName queryMethod) tid addr | ||
397 | $(logDebugS) "query.sending" signature | ||
398 | |||
399 | mres <- liftIO $ do | ||
400 | ares <- registerQuery (tid, addr) pendingCalls | ||
401 | |||
402 | #ifdef VERSION_bencoding | ||
403 | let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid) | ||
404 | qb = encodePayload q :: KMessage | ||
405 | qbs = encodeHeaders () qb :: BC.ByteString | ||
406 | #else | ||
407 | let q = Tox.Message (methodName queryMethod) cli tid params | ||
408 | cli = error "TODO TOX client node id" | ||
409 | ctx = error "TODO TOX ToxCipherContext" | ||
410 | qb = encodePayload q :: Tox.Message BC.ByteString | ||
411 | qbs = encodeHeaders ctx qb :: BC.ByteString | ||
412 | #endif | ||
413 | sendQuery sock addr qbs | ||
414 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
415 | |||
416 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
417 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) | ||
418 | case res of | ||
419 | #ifdef VERSION_bencoding | ||
420 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
421 | Right (R (KResponse {..})) -> | ||
422 | case fromBEncode respVals of | ||
423 | Right r -> pure $ kont raw r respIP | ||
424 | #else | ||
425 | Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" | ||
426 | Right (Tox.Message {..}) -> | ||
427 | case S.decode msgPayload of | ||
428 | Right r -> pure $ kont raw r Nothing | ||
429 | #endif | ||
430 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
431 | |||
432 | case mres of | ||
433 | Just res -> do | ||
434 | $(logDebugS) "query.responded" $ signature | ||
435 | return res | ||
436 | |||
437 | Nothing -> do | ||
438 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
439 | $(logWarnS) "query.not_responding" $ signature <> " for " <> | ||
440 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
441 | throw $ TimeoutExpired | ||
442 | |||
443 | {----------------------------------------------------------------------- | ||
444 | -- Handlers | ||
445 | -----------------------------------------------------------------------} | ||
446 | -- we already throw: | ||
447 | -- | ||
448 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
449 | -- | ||
450 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
451 | -- | ||
452 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
453 | -- async exception too) | ||
454 | -- | ||
455 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
456 | |||
457 | -- | Used to signal protocol errors. | ||
458 | data HandlerFailure | ||
459 | = BadAddress -- ^ for e.g.: node calls herself; | ||
460 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
461 | deriving (Show, Eq, Typeable) | ||
462 | |||
463 | instance Exception HandlerFailure | ||
464 | |||
465 | prettyHF :: HandlerFailure -> BS.ByteString | ||
466 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
467 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
468 | "invalid parameter: " <> reason | ||
469 | |||
470 | prettyQF :: QueryFailure -> BS.ByteString | ||
471 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
472 | <> T.pack (show e) | ||
473 | |||
474 | -- | Make handler from handler function. Any thrown exception will be | ||
475 | -- supressed and send over the wire back to the querying node. | ||
476 | -- | ||
477 | -- If the handler make some 'query' normally it /should/ handle | ||
478 | -- corresponding 'QueryFailure's. | ||
479 | -- | ||
480 | handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) | ||
481 | => (SockAddr -> a -> h b) -> Handler h msg raw | ||
482 | handler body = (name, wrapper) | ||
483 | where | ||
484 | Method name = method :: Method a b | ||
485 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) | ||
486 | wrapper addr args = | ||
487 | case decodePayload args of | ||
488 | Left e -> pure $ Left e | ||
489 | Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) | ||
490 | |||
491 | runHandler :: MonadKRPC h m | ||
492 | #ifdef VERSION_bencoding | ||
493 | => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult | ||
494 | #else | ||
495 | => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult | ||
496 | #endif | ||
497 | runHandler h addr m = Lifted.catches wrapper failbacks | ||
498 | where | ||
499 | signature = querySignature (queryMethod m) (queryId m) addr | ||
500 | |||
501 | wrapper = do | ||
502 | $(logDebugS) "handler.quered" signature | ||
503 | #ifdef VERSION_bencoding | ||
504 | result <- liftHandler (h addr (Q m)) | ||
505 | #else | ||
506 | result <- liftHandler (h addr m) | ||
507 | #endif | ||
508 | |||
509 | case result of | ||
510 | Left msg -> do | ||
511 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
512 | #ifdef VERSION_bencoding | ||
513 | return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) | ||
514 | #else | ||
515 | return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) | ||
516 | #endif | ||
517 | |||
518 | Right a -> do -- KQueryArgs | ||
519 | $(logDebugS) "handler.success" signature | ||
520 | return $ Right a | ||
521 | |||
522 | failbacks = | ||
523 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
524 | $(logDebugS) "handler.failed" signature | ||
525 | #ifdef VERSION_bencoding | ||
526 | return $ Left $ KError ProtocolError (prettyHF e) (queryId m) | ||
527 | #else | ||
528 | return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) | ||
529 | #endif | ||
530 | |||
531 | |||
532 | -- may happen if handler makes query and fail | ||
533 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
534 | #ifdef VERSION_bencoding | ||
535 | return $ Left $ KError ServerError (prettyQF e) (queryId m) | ||
536 | #else | ||
537 | return $ Left $ decodeError "TODO TOX ServerError" (queryId m) | ||
538 | #endif | ||
539 | |||
540 | -- since handler thread exit after sendMessage we can safely | ||
541 | -- suppress async exception here | ||
542 | , E.Handler $ \ (e :: SomeException) -> do | ||
543 | #ifdef VERSION_bencoding | ||
544 | return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) | ||
545 | #else | ||
546 | return $ Left $ decodeError "TODO TOX GenericError" (queryId m) | ||
547 | #endif | ||
548 | ] | ||
549 | |||
550 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | ||
551 | dispatchHandler q addr = do | ||
552 | Manager {..} <- getManager | ||
553 | case L.lookup (queryMethod q) handlers of | ||
554 | #ifdef VERSION_bencoding | ||
555 | Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) | ||
556 | #else | ||
557 | Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) | ||
558 | #endif | ||
559 | Just h -> runHandler h addr q | ||
560 | |||
561 | {----------------------------------------------------------------------- | ||
562 | -- Listener | ||
563 | -----------------------------------------------------------------------} | ||
564 | |||
565 | -- TODO bound amount of parallel handler *threads*: | ||
566 | -- | ||
567 | -- peer A flooding with find_node | ||
568 | -- peer B trying to ping peer C | ||
569 | -- peer B fork too many threads | ||
570 | -- ... space leak | ||
571 | -- | ||
572 | handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () | ||
573 | handleQuery raw q addr = void $ fork $ do | ||
574 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
575 | Manager {..} <- getManager | ||
576 | res <- dispatchHandler q addr | ||
577 | #ifdef VERSION_bencoding | ||
578 | let res' = either E id res | ||
579 | resbe = either toBEncode toBEncode res | ||
580 | $(logOther "q") $ T.unlines | ||
581 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
582 | , "==>" | ||
583 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
584 | ] | ||
585 | sendMessage sock addr $ encodeHeaders () res' | ||
586 | #else | ||
587 | -- Errors not sent for Tox. | ||
588 | let ctx = error "TODO TOX ToxCipherContext 2" | ||
589 | either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res | ||
590 | #endif | ||
591 | |||
592 | handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () | ||
593 | handleResponse raw result addr = do | ||
594 | Manager {..} <- getManager | ||
595 | liftIO $ do | ||
596 | #ifdef VERSION_bencoding | ||
597 | let resultId = either errorId envelopeTransaction result | ||
598 | #else | ||
599 | let resultId = either Tox.msgNonce Tox.msgNonce result | ||
600 | #endif | ||
601 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
602 | case mcall of | ||
603 | Nothing -> return () | ||
604 | Just ares -> putMVar ares (raw,result) | ||
605 | |||
606 | #ifdef VERSION_bencoding | ||
607 | handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () | ||
608 | handleMessage raw (Q q) = handleQuery raw q | ||
609 | handleMessage raw (R r) = handleResponse raw (Right (R r)) | ||
610 | handleMessage raw (E e) = handleResponse raw (Left e) | ||
611 | #else | ||
612 | handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () | ||
613 | handleMessage raw q | Tox.isQuery q = handleQuery raw q | ||
614 | handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) | ||
615 | handleMessage raw e | Tox.isError e = handleResponse raw (Left e) | ||
616 | #endif | ||
617 | |||
618 | listener :: MonadKRPC h m => m () | ||
619 | listener = do | ||
620 | Manager {..} <- getManager | ||
621 | fix $ \again -> do | ||
622 | let ctx = error "TODO TOX ToxCipherContext 3" | ||
623 | (bs, addr) <- liftIO $ do | ||
624 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
625 | #ifdef VERSION_bencoding | ||
626 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | ||
627 | #else | ||
628 | case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of | ||
629 | #endif | ||
630 | -- TODO ignore unknown messages at all? | ||
631 | #ifdef VERSION_bencoding | ||
632 | Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) | ||
633 | #else | ||
634 | Left _ -> return () -- TODO TOX send unknownMessage error | ||
635 | #endif | ||
636 | Right (raw,m) -> handleMessage raw m addr | ||
637 | again | ||
638 | where | ||
639 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
640 | exceptions e | ||
641 | -- packets with empty payload may trigger eof exception | ||
642 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
643 | | otherwise = throwIO e | ||
644 | |||
645 | -- | Should be run before any 'query', otherwise they will never | ||
646 | -- succeed. | ||
647 | listen :: MonadKRPC h m => m () | ||
648 | listen = do | ||
649 | Manager {..} <- getManager | ||
650 | tid <- fork $ do | ||
651 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | ||
652 | listener `Lifted.finally` | ||
653 | liftIO (takeMVar listenerThread) | ||
654 | liftIO $ putMVar listenerThread tid | ||
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs deleted file mode 100644 index efd59f32..00000000 --- a/src/Network/KRPC/Manager.hs +++ /dev/null | |||
@@ -1,596 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally, you don't need to import this module. | ||
9 | -- | ||
10 | {-# LANGUAGE CPP #-} | ||
11 | {-# LANGUAGE OverloadedStrings #-} | ||
12 | {-# LANGUAGE FlexibleInstances #-} | ||
13 | {-# LANGUAGE FlexibleContexts #-} | ||
14 | {-# LANGUAGE ScopedTypeVariables #-} | ||
15 | {-# LANGUAGE DefaultSignatures #-} | ||
16 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
17 | {-# LANGUAGE FunctionalDependencies #-} | ||
18 | {-# LANGUAGE DeriveDataTypeable #-} | ||
19 | {-# LANGUAGE TemplateHaskell #-} | ||
20 | module Network.KRPC.Manager | ||
21 | ( -- * Manager | ||
22 | MonadKRPC (..) | ||
23 | , Options (..) | ||
24 | , Manager | ||
25 | , newManager | ||
26 | , closeManager | ||
27 | , withManager | ||
28 | , isActive | ||
29 | , listen | ||
30 | |||
31 | -- * Queries | ||
32 | , QueryFailure (..) | ||
33 | , query | ||
34 | , query' | ||
35 | , queryRaw | ||
36 | , getQueryCount | ||
37 | |||
38 | -- * Handlers | ||
39 | , HandlerFailure (..) | ||
40 | , Handler | ||
41 | , handler | ||
42 | ) where | ||
43 | |||
44 | import Control.Applicative | ||
45 | #ifdef THREAD_DEBUG | ||
46 | import Control.Concurrent.Lifted.Instrument | ||
47 | #else | ||
48 | import GHC.Conc (labelThread) | ||
49 | import Control.Concurrent.Lifted | ||
50 | #endif | ||
51 | import Control.Exception hiding (Handler) | ||
52 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
53 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
54 | import Control.Monad | ||
55 | import Control.Monad.Logger | ||
56 | import Control.Monad.Reader | ||
57 | import Control.Monad.Trans.Control | ||
58 | #ifdef VERSION_bencoding | ||
59 | import Data.BEncode as BE | ||
60 | import Data.BEncode.Internal as BE | ||
61 | import Data.BEncode.Pretty (showBEncode) | ||
62 | #else | ||
63 | import qualified Data.Tox as Tox | ||
64 | #endif | ||
65 | import qualified Data.ByteString.Base16 as Base16 | ||
66 | import Data.ByteString as BS | ||
67 | import Data.ByteString.Char8 as BC | ||
68 | import Data.ByteString.Lazy as BL | ||
69 | import Data.Default.Class | ||
70 | import Data.IORef | ||
71 | import Data.List as L | ||
72 | import Data.Map as M | ||
73 | import Data.Monoid | ||
74 | import Data.Serialize as S | ||
75 | import Data.Text as T | ||
76 | import Data.Text.Encoding as T | ||
77 | import Data.Tuple | ||
78 | import Data.Typeable | ||
79 | import Network.RPC | ||
80 | import Network.KRPC.Message | ||
81 | import Network.KRPC.Method hiding (Envelope) | ||
82 | import qualified Network.KRPC.Method as KRPC (Envelope) | ||
83 | import Network.Socket hiding (listen) | ||
84 | import Network.Socket.ByteString as BS | ||
85 | import System.IO.Error | ||
86 | import System.Timeout | ||
87 | #ifdef VERSION_bencoding | ||
88 | import Network.DHT.Mainline | ||
89 | #endif | ||
90 | |||
91 | |||
92 | {----------------------------------------------------------------------- | ||
93 | -- Options | ||
94 | -----------------------------------------------------------------------} | ||
95 | |||
96 | -- | RPC manager options. | ||
97 | data Options = Options | ||
98 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
99 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
100 | |||
101 | -- | Time to wait for response from remote node, in seconds. | ||
102 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
103 | |||
104 | -- | Maximum number of bytes to receive. | ||
105 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
106 | } deriving (Show, Eq) | ||
107 | |||
108 | defaultSeedTransaction :: Int | ||
109 | defaultSeedTransaction = 0 | ||
110 | |||
111 | defaultQueryTimeout :: Int | ||
112 | defaultQueryTimeout = 120 | ||
113 | |||
114 | defaultMaxMsgSize :: Int | ||
115 | defaultMaxMsgSize = 64 * 1024 | ||
116 | |||
117 | -- | Permissive defaults. | ||
118 | instance Default Options where | ||
119 | def = Options | ||
120 | { optSeedTransaction = defaultSeedTransaction | ||
121 | , optQueryTimeout = defaultQueryTimeout | ||
122 | , optMaxMsgSize = defaultMaxMsgSize | ||
123 | } | ||
124 | |||
125 | validateOptions :: Options -> IO () | ||
126 | validateOptions Options {..} | ||
127 | | optQueryTimeout < 1 | ||
128 | = throwIO (userError "krpc: non-positive query timeout") | ||
129 | | optMaxMsgSize < 1 | ||
130 | = throwIO (userError "krpc: non-positive buffer size") | ||
131 | | otherwise = return () | ||
132 | |||
133 | {----------------------------------------------------------------------- | ||
134 | -- Options | ||
135 | -----------------------------------------------------------------------} | ||
136 | |||
137 | type KResult = Either KError KMessage -- Response | ||
138 | |||
139 | type TransactionCounter = IORef Int | ||
140 | type CallId = (TransactionId, SockAddr) | ||
141 | type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) | ||
142 | type PendingCalls = IORef (Map CallId CallRes) | ||
143 | |||
144 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) | ||
145 | |||
146 | -- | Handler is a function which will be invoked then some /remote/ | ||
147 | -- node querying /this/ node. | ||
148 | type Handler h msg v = (MethodName, HandlerBody h msg v) | ||
149 | |||
150 | -- | Keep track pending queries made by /this/ node and handle queries | ||
151 | -- made by /remote/ nodes. | ||
152 | data Manager h = Manager | ||
153 | { sock :: !Socket | ||
154 | , options :: !Options | ||
155 | , listenerThread :: !(MVar ThreadId) | ||
156 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
157 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | ||
158 | #ifdef VERSION_bencoding | ||
159 | , handlers :: [Handler h KMessageOf BValue] | ||
160 | #else | ||
161 | , handlers :: [Handler h KMessageOf BC.ByteString] | ||
162 | #endif | ||
163 | } | ||
164 | |||
165 | -- | A monad which can perform or handle queries. | ||
166 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | ||
167 | => MonadKRPC h m | m -> h where | ||
168 | |||
169 | -- | Ask for manager. | ||
170 | getManager :: m (Manager h) | ||
171 | |||
172 | default getManager :: MonadReader (Manager h) m => m (Manager h) | ||
173 | getManager = ask | ||
174 | |||
175 | -- | Can be used to add logging for instance. | ||
176 | liftHandler :: h a -> m a | ||
177 | |||
178 | default liftHandler :: m a -> m a | ||
179 | liftHandler = id | ||
180 | |||
181 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | ||
182 | => MonadKRPC h (ReaderT (Manager h) h) where | ||
183 | |||
184 | liftHandler = lift | ||
185 | |||
186 | sockAddrFamily :: SockAddr -> Family | ||
187 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
188 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
189 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
190 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
191 | |||
192 | -- | Bind socket to the specified address. To enable query handling | ||
193 | -- run 'listen'. | ||
194 | newManager :: Options -- ^ various protocol options; | ||
195 | -> SockAddr -- ^ address to listen on; | ||
196 | #ifdef VERSION_bencoding | ||
197 | -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. | ||
198 | #else | ||
199 | -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries. | ||
200 | #endif | ||
201 | -> IO (Manager h) -- ^ new rpc manager. | ||
202 | newManager opts @ Options {..} servAddr handlers = do | ||
203 | validateOptions opts | ||
204 | sock <- bindServ | ||
205 | tref <- newEmptyMVar | ||
206 | tran <- newIORef optSeedTransaction | ||
207 | calls <- newIORef M.empty | ||
208 | return $ Manager sock opts tref tran calls handlers | ||
209 | where | ||
210 | bindServ = do | ||
211 | let family = sockAddrFamily servAddr | ||
212 | sock <- socket family Datagram defaultProtocol | ||
213 | when (family == AF_INET6) $ do | ||
214 | setSocketOption sock IPv6Only 0 | ||
215 | bindSocket sock servAddr | ||
216 | return sock | ||
217 | |||
218 | -- | Unblock all pending calls and close socket. | ||
219 | closeManager :: Manager m -> IO () | ||
220 | closeManager Manager {..} = do | ||
221 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
222 | -- TODO unblock calls | ||
223 | close sock | ||
224 | |||
225 | -- | Check if the manager is still active. Manager becomes active | ||
226 | -- until 'closeManager' called. | ||
227 | isActive :: Manager m -> IO Bool | ||
228 | isActive Manager {..} = liftIO $ isBound sock | ||
229 | {-# INLINE isActive #-} | ||
230 | |||
231 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
232 | -- function. | ||
233 | #ifdef VERSION_bencoding | ||
234 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] | ||
235 | #else | ||
236 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString] | ||
237 | #endif | ||
238 | -> (Manager h -> IO a) -> IO a | ||
239 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
240 | |||
241 | {----------------------------------------------------------------------- | ||
242 | -- Logging | ||
243 | -----------------------------------------------------------------------} | ||
244 | |||
245 | -- TODO prettify log messages | ||
246 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | ||
247 | querySignature name transaction addr = T.concat | ||
248 | #ifdef VERSION_bencoding | ||
249 | [ "&", T.decodeUtf8 name | ||
250 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction | ||
251 | #else | ||
252 | [ "&", T.pack (show name) | ||
253 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | ||
254 | #endif | ||
255 | , " @", T.pack (show addr) | ||
256 | ] | ||
257 | |||
258 | {----------------------------------------------------------------------- | ||
259 | -- Client | ||
260 | -----------------------------------------------------------------------} | ||
261 | -- we don't need to know about TransactionId while performing query, | ||
262 | -- so we introduce QueryFailure exceptions | ||
263 | |||
264 | -- | Used to signal 'query' errors. | ||
265 | data QueryFailure | ||
266 | = SendFailed -- ^ unable to send query; | ||
267 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
268 | | TimeoutExpired -- ^ remote node not responding. | ||
269 | deriving (Show, Eq, Typeable) | ||
270 | |||
271 | instance Exception QueryFailure | ||
272 | |||
273 | sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | ||
274 | sendMessage sock addr a = do | ||
275 | liftIO $ sendManyTo sock [a] addr | ||
276 | |||
277 | genTransactionId :: TransactionCounter -> IO TransactionId | ||
278 | genTransactionId ref = do | ||
279 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
280 | #ifdef VERSION_bencoding | ||
281 | return $ BC.pack (show cur) | ||
282 | #else | ||
283 | return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') | ||
284 | #endif | ||
285 | |||
286 | -- | How many times 'query' call have been performed. | ||
287 | getQueryCount :: MonadKRPC h m => m Int | ||
288 | getQueryCount = do | ||
289 | Manager {..} <- getManager | ||
290 | curTrans <- liftIO $ readIORef transactionCounter | ||
291 | return $ curTrans - optSeedTransaction options | ||
292 | |||
293 | registerQuery :: CallId -> PendingCalls -> IO CallRes | ||
294 | registerQuery cid ref = do | ||
295 | ares <- newEmptyMVar | ||
296 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
297 | return ares | ||
298 | |||
299 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
300 | -- or more responses to the same query | ||
301 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | ||
302 | unregisterQuery cid ref = do | ||
303 | atomicModifyIORef' ref $ swap . | ||
304 | M.updateLookupWithKey (const (const Nothing)) cid | ||
305 | |||
306 | |||
307 | -- (sendmsg EINVAL) | ||
308 | sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () | ||
309 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
310 | where | ||
311 | sockError :: IOError -> IO () | ||
312 | sockError _ = throwIO SendFailed | ||
313 | |||
314 | -- | Enqueue query to the given node. | ||
315 | -- | ||
316 | -- This function should throw 'QueryFailure' exception if quered node | ||
317 | -- respond with @error@ message or the query timeout expires. | ||
318 | -- | ||
319 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | ||
320 | query addr params = queryK addr params (\_ x _ -> x) | ||
321 | |||
322 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
323 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | ||
324 | query' addr params = queryK addr params (const (,)) | ||
325 | |||
326 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
327 | -- remote Node. This is useful for handling extensions that this library does | ||
328 | -- not otherwise support. | ||
329 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) | ||
330 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | ||
331 | |||
332 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | ||
333 | SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x | ||
334 | queryK addr params kont = do | ||
335 | Manager {..} <- getManager | ||
336 | tid <- liftIO $ genTransactionId transactionCounter | ||
337 | let queryMethod = method :: Method a b | ||
338 | let signature = querySignature (methodName queryMethod) tid addr | ||
339 | $(logDebugS) "query.sending" signature | ||
340 | |||
341 | mres <- liftIO $ do | ||
342 | ares <- registerQuery (tid, addr) pendingCalls | ||
343 | |||
344 | #ifdef VERSION_bencoding | ||
345 | let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid) | ||
346 | qb = encodePayload q :: KMessage | ||
347 | qbs = encodeHeaders () qb :: BC.ByteString | ||
348 | #else | ||
349 | let q = Tox.Message (methodName queryMethod) cli tid params | ||
350 | cli = error "TODO TOX client node id" | ||
351 | ctx = error "TODO TOX ToxCipherContext" | ||
352 | qb = encodePayload q :: Tox.Message BC.ByteString | ||
353 | qbs = encodeHeaders ctx qb :: BC.ByteString | ||
354 | #endif | ||
355 | sendQuery sock addr qbs | ||
356 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
357 | |||
358 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
359 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) | ||
360 | case res of | ||
361 | #ifdef VERSION_bencoding | ||
362 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
363 | Right (R (KResponse {..})) -> | ||
364 | case fromBEncode respVals of | ||
365 | Right r -> pure $ kont raw r respIP | ||
366 | #else | ||
367 | Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" | ||
368 | Right (Tox.Message {..}) -> | ||
369 | case S.decode msgPayload of | ||
370 | Right r -> pure $ kont raw r Nothing | ||
371 | #endif | ||
372 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
373 | |||
374 | case mres of | ||
375 | Just res -> do | ||
376 | $(logDebugS) "query.responded" $ signature | ||
377 | return res | ||
378 | |||
379 | Nothing -> do | ||
380 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
381 | $(logWarnS) "query.not_responding" $ signature <> " for " <> | ||
382 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
383 | throw $ TimeoutExpired | ||
384 | |||
385 | {----------------------------------------------------------------------- | ||
386 | -- Handlers | ||
387 | -----------------------------------------------------------------------} | ||
388 | -- we already throw: | ||
389 | -- | ||
390 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
391 | -- | ||
392 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
393 | -- | ||
394 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
395 | -- async exception too) | ||
396 | -- | ||
397 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
398 | |||
399 | -- | Used to signal protocol errors. | ||
400 | data HandlerFailure | ||
401 | = BadAddress -- ^ for e.g.: node calls herself; | ||
402 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
403 | deriving (Show, Eq, Typeable) | ||
404 | |||
405 | instance Exception HandlerFailure | ||
406 | |||
407 | prettyHF :: HandlerFailure -> BS.ByteString | ||
408 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
409 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
410 | "invalid parameter: " <> reason | ||
411 | |||
412 | prettyQF :: QueryFailure -> BS.ByteString | ||
413 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
414 | <> T.pack (show e) | ||
415 | |||
416 | -- | Make handler from handler function. Any thrown exception will be | ||
417 | -- supressed and send over the wire back to the querying node. | ||
418 | -- | ||
419 | -- If the handler make some 'query' normally it /should/ handle | ||
420 | -- corresponding 'QueryFailure's. | ||
421 | -- | ||
422 | handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) | ||
423 | => (SockAddr -> a -> h b) -> Handler h msg raw | ||
424 | handler body = (name, wrapper) | ||
425 | where | ||
426 | Method name = method :: Method a b | ||
427 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) | ||
428 | wrapper addr args = | ||
429 | case decodePayload args of | ||
430 | Left e -> pure $ Left e | ||
431 | Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) | ||
432 | |||
433 | runHandler :: MonadKRPC h m | ||
434 | #ifdef VERSION_bencoding | ||
435 | => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult | ||
436 | #else | ||
437 | => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult | ||
438 | #endif | ||
439 | runHandler h addr m = Lifted.catches wrapper failbacks | ||
440 | where | ||
441 | signature = querySignature (queryMethod m) (queryId m) addr | ||
442 | |||
443 | wrapper = do | ||
444 | $(logDebugS) "handler.quered" signature | ||
445 | #ifdef VERSION_bencoding | ||
446 | result <- liftHandler (h addr (Q m)) | ||
447 | #else | ||
448 | result <- liftHandler (h addr m) | ||
449 | #endif | ||
450 | |||
451 | case result of | ||
452 | Left msg -> do | ||
453 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
454 | #ifdef VERSION_bencoding | ||
455 | return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) | ||
456 | #else | ||
457 | return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) | ||
458 | #endif | ||
459 | |||
460 | Right a -> do -- KQueryArgs | ||
461 | $(logDebugS) "handler.success" signature | ||
462 | return $ Right a | ||
463 | |||
464 | failbacks = | ||
465 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
466 | $(logDebugS) "handler.failed" signature | ||
467 | #ifdef VERSION_bencoding | ||
468 | return $ Left $ KError ProtocolError (prettyHF e) (queryId m) | ||
469 | #else | ||
470 | return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) | ||
471 | #endif | ||
472 | |||
473 | |||
474 | -- may happen if handler makes query and fail | ||
475 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
476 | #ifdef VERSION_bencoding | ||
477 | return $ Left $ KError ServerError (prettyQF e) (queryId m) | ||
478 | #else | ||
479 | return $ Left $ decodeError "TODO TOX ServerError" (queryId m) | ||
480 | #endif | ||
481 | |||
482 | -- since handler thread exit after sendMessage we can safely | ||
483 | -- suppress async exception here | ||
484 | , E.Handler $ \ (e :: SomeException) -> do | ||
485 | #ifdef VERSION_bencoding | ||
486 | return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) | ||
487 | #else | ||
488 | return $ Left $ decodeError "TODO TOX GenericError" (queryId m) | ||
489 | #endif | ||
490 | ] | ||
491 | |||
492 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | ||
493 | dispatchHandler q addr = do | ||
494 | Manager {..} <- getManager | ||
495 | case L.lookup (queryMethod q) handlers of | ||
496 | #ifdef VERSION_bencoding | ||
497 | Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) | ||
498 | #else | ||
499 | Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) | ||
500 | #endif | ||
501 | Just h -> runHandler h addr q | ||
502 | |||
503 | {----------------------------------------------------------------------- | ||
504 | -- Listener | ||
505 | -----------------------------------------------------------------------} | ||
506 | |||
507 | -- TODO bound amount of parallel handler *threads*: | ||
508 | -- | ||
509 | -- peer A flooding with find_node | ||
510 | -- peer B trying to ping peer C | ||
511 | -- peer B fork too many threads | ||
512 | -- ... space leak | ||
513 | -- | ||
514 | handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () | ||
515 | handleQuery raw q addr = void $ fork $ do | ||
516 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
517 | Manager {..} <- getManager | ||
518 | res <- dispatchHandler q addr | ||
519 | #ifdef VERSION_bencoding | ||
520 | let res' = either E id res | ||
521 | resbe = either toBEncode toBEncode res | ||
522 | $(logOther "q") $ T.unlines | ||
523 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
524 | , "==>" | ||
525 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
526 | ] | ||
527 | sendMessage sock addr $ encodeHeaders () res' | ||
528 | #else | ||
529 | -- Errors not sent for Tox. | ||
530 | let ctx = error "TODO TOX ToxCipherContext 2" | ||
531 | either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res | ||
532 | #endif | ||
533 | |||
534 | handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () | ||
535 | handleResponse raw result addr = do | ||
536 | Manager {..} <- getManager | ||
537 | liftIO $ do | ||
538 | #ifdef VERSION_bencoding | ||
539 | let resultId = either errorId envelopeTransaction result | ||
540 | #else | ||
541 | let resultId = either Tox.msgNonce Tox.msgNonce result | ||
542 | #endif | ||
543 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
544 | case mcall of | ||
545 | Nothing -> return () | ||
546 | Just ares -> putMVar ares (raw,result) | ||
547 | |||
548 | #ifdef VERSION_bencoding | ||
549 | handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () | ||
550 | handleMessage raw (Q q) = handleQuery raw q | ||
551 | handleMessage raw (R r) = handleResponse raw (Right (R r)) | ||
552 | handleMessage raw (E e) = handleResponse raw (Left e) | ||
553 | #else | ||
554 | handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () | ||
555 | handleMessage raw q | Tox.isQuery q = handleQuery raw q | ||
556 | handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) | ||
557 | handleMessage raw e | Tox.isError e = handleResponse raw (Left e) | ||
558 | #endif | ||
559 | |||
560 | listener :: MonadKRPC h m => m () | ||
561 | listener = do | ||
562 | Manager {..} <- getManager | ||
563 | fix $ \again -> do | ||
564 | let ctx = error "TODO TOX ToxCipherContext 3" | ||
565 | (bs, addr) <- liftIO $ do | ||
566 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
567 | #ifdef VERSION_bencoding | ||
568 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | ||
569 | #else | ||
570 | case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of | ||
571 | #endif | ||
572 | -- TODO ignore unknown messages at all? | ||
573 | #ifdef VERSION_bencoding | ||
574 | Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) | ||
575 | #else | ||
576 | Left _ -> return () -- TODO TOX send unknownMessage error | ||
577 | #endif | ||
578 | Right (raw,m) -> handleMessage raw m addr | ||
579 | again | ||
580 | where | ||
581 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
582 | exceptions e | ||
583 | -- packets with empty payload may trigger eof exception | ||
584 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
585 | | otherwise = throwIO e | ||
586 | |||
587 | -- | Should be run before any 'query', otherwise they will never | ||
588 | -- succeed. | ||
589 | listen :: MonadKRPC h m => m () | ||
590 | listen = do | ||
591 | Manager {..} <- getManager | ||
592 | tid <- fork $ do | ||
593 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | ||
594 | listener `Lifted.finally` | ||
595 | liftIO (takeMVar listenerThread) | ||
596 | liftIO $ putMVar listenerThread tid | ||