diff options
Diffstat (limited to 'src/Network/KRPC.hs')
-rw-r--r-- | src/Network/KRPC.hs | 565 |
1 files changed, 564 insertions, 1 deletions
diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index d185fb4c..22ab81f2 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs | |||
@@ -50,6 +50,16 @@ | |||
50 | -- | 50 | -- |
51 | -- For protocol details see "Network.KRPC.Message" module. | 51 | -- For protocol details see "Network.KRPC.Message" module. |
52 | -- | 52 | -- |
53 | {-# LANGUAGE CPP #-} | ||
54 | {-# LANGUAGE OverloadedStrings #-} | ||
55 | {-# LANGUAGE FlexibleInstances #-} | ||
56 | {-# LANGUAGE FlexibleContexts #-} | ||
57 | {-# LANGUAGE ScopedTypeVariables #-} | ||
58 | {-# LANGUAGE DefaultSignatures #-} | ||
59 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
60 | {-# LANGUAGE FunctionalDependencies #-} | ||
61 | {-# LANGUAGE DeriveDataTypeable #-} | ||
62 | {-# LANGUAGE TemplateHaskell #-} | ||
53 | module Network.KRPC | 63 | module Network.KRPC |
54 | ( -- * Methods | 64 | ( -- * Methods |
55 | Method | 65 | Method |
@@ -87,5 +97,558 @@ module Network.KRPC | |||
87 | import Data.Default.Class | 97 | import Data.Default.Class |
88 | import Network.KRPC.Message | 98 | import Network.KRPC.Message |
89 | import Network.KRPC.Method | 99 | import Network.KRPC.Method |
90 | import Network.KRPC.Manager | ||
91 | import Network.Socket (SockAddr (..)) | 100 | import Network.Socket (SockAddr (..)) |
101 | |||
102 | import Control.Applicative | ||
103 | #ifdef THREAD_DEBUG | ||
104 | import Control.Concurrent.Lifted.Instrument | ||
105 | #else | ||
106 | import GHC.Conc (labelThread) | ||
107 | import Control.Concurrent.Lifted | ||
108 | #endif | ||
109 | import Control.Exception hiding (Handler) | ||
110 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
111 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
112 | import Control.Monad | ||
113 | import Control.Monad.Logger | ||
114 | import Control.Monad.Reader | ||
115 | import Control.Monad.Trans.Control | ||
116 | #ifdef VERSION_bencoding | ||
117 | import Data.BEncode as BE | ||
118 | import Data.BEncode.Internal as BE | ||
119 | import Data.BEncode.Pretty (showBEncode) | ||
120 | #else | ||
121 | import qualified Data.Tox as Tox | ||
122 | #endif | ||
123 | import qualified Data.ByteString.Base16 as Base16 | ||
124 | import Data.ByteString as BS | ||
125 | import Data.ByteString.Char8 as BC | ||
126 | import Data.ByteString.Lazy as BL | ||
127 | import Data.Default.Class | ||
128 | import Data.IORef | ||
129 | import Data.List as L | ||
130 | import Data.Map as M | ||
131 | import Data.Monoid | ||
132 | import Data.Serialize as S | ||
133 | import Data.Text as T | ||
134 | import Data.Text.Encoding as T | ||
135 | import Data.Tuple | ||
136 | import Data.Typeable | ||
137 | import Network.RPC | ||
138 | import Network.KRPC.Message | ||
139 | import Network.KRPC.Method hiding (Envelope) | ||
140 | import qualified Network.KRPC.Method as KRPC (Envelope) | ||
141 | import Network.Socket hiding (listen) | ||
142 | import Network.Socket.ByteString as BS | ||
143 | import System.IO.Error | ||
144 | import System.Timeout | ||
145 | #ifdef VERSION_bencoding | ||
146 | import Network.DHT.Mainline | ||
147 | #endif | ||
148 | |||
149 | |||
150 | {----------------------------------------------------------------------- | ||
151 | -- Options | ||
152 | -----------------------------------------------------------------------} | ||
153 | |||
154 | -- | RPC manager options. | ||
155 | data Options = Options | ||
156 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
157 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
158 | |||
159 | -- | Time to wait for response from remote node, in seconds. | ||
160 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
161 | |||
162 | -- | Maximum number of bytes to receive. | ||
163 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
164 | } deriving (Show, Eq) | ||
165 | |||
166 | defaultSeedTransaction :: Int | ||
167 | defaultSeedTransaction = 0 | ||
168 | |||
169 | defaultQueryTimeout :: Int | ||
170 | defaultQueryTimeout = 120 | ||
171 | |||
172 | defaultMaxMsgSize :: Int | ||
173 | defaultMaxMsgSize = 64 * 1024 | ||
174 | |||
175 | -- | Permissive defaults. | ||
176 | instance Default Options where | ||
177 | def = Options | ||
178 | { optSeedTransaction = defaultSeedTransaction | ||
179 | , optQueryTimeout = defaultQueryTimeout | ||
180 | , optMaxMsgSize = defaultMaxMsgSize | ||
181 | } | ||
182 | |||
183 | validateOptions :: Options -> IO () | ||
184 | validateOptions Options {..} | ||
185 | | optQueryTimeout < 1 | ||
186 | = throwIO (userError "krpc: non-positive query timeout") | ||
187 | | optMaxMsgSize < 1 | ||
188 | = throwIO (userError "krpc: non-positive buffer size") | ||
189 | | otherwise = return () | ||
190 | |||
191 | {----------------------------------------------------------------------- | ||
192 | -- Options | ||
193 | -----------------------------------------------------------------------} | ||
194 | |||
195 | type KResult = Either KError KMessage -- Response | ||
196 | |||
197 | type TransactionCounter = IORef Int | ||
198 | type CallId = (TransactionId, SockAddr) | ||
199 | type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) | ||
200 | type PendingCalls = IORef (Map CallId CallRes) | ||
201 | |||
202 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) | ||
203 | |||
204 | -- | Handler is a function which will be invoked then some /remote/ | ||
205 | -- node querying /this/ node. | ||
206 | type Handler h msg v = (MethodName, HandlerBody h msg v) | ||
207 | |||
208 | -- | Keep track pending queries made by /this/ node and handle queries | ||
209 | -- made by /remote/ nodes. | ||
210 | data Manager h = Manager | ||
211 | { sock :: !Socket | ||
212 | , options :: !Options | ||
213 | , listenerThread :: !(MVar ThreadId) | ||
214 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
215 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | ||
216 | #ifdef VERSION_bencoding | ||
217 | , handlers :: [Handler h KMessageOf BValue] | ||
218 | #else | ||
219 | , handlers :: [Handler h KMessageOf BC.ByteString] | ||
220 | #endif | ||
221 | } | ||
222 | |||
223 | -- | A monad which can perform or handle queries. | ||
224 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | ||
225 | => MonadKRPC h m | m -> h where | ||
226 | |||
227 | -- | Ask for manager. | ||
228 | getManager :: m (Manager h) | ||
229 | |||
230 | default getManager :: MonadReader (Manager h) m => m (Manager h) | ||
231 | getManager = ask | ||
232 | |||
233 | -- | Can be used to add logging for instance. | ||
234 | liftHandler :: h a -> m a | ||
235 | |||
236 | default liftHandler :: m a -> m a | ||
237 | liftHandler = id | ||
238 | |||
239 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | ||
240 | => MonadKRPC h (ReaderT (Manager h) h) where | ||
241 | |||
242 | liftHandler = lift | ||
243 | |||
244 | sockAddrFamily :: SockAddr -> Family | ||
245 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
246 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
247 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
248 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
249 | |||
250 | -- | Bind socket to the specified address. To enable query handling | ||
251 | -- run 'listen'. | ||
252 | newManager :: Options -- ^ various protocol options; | ||
253 | -> SockAddr -- ^ address to listen on; | ||
254 | #ifdef VERSION_bencoding | ||
255 | -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. | ||
256 | #else | ||
257 | -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries. | ||
258 | #endif | ||
259 | -> IO (Manager h) -- ^ new rpc manager. | ||
260 | newManager opts @ Options {..} servAddr handlers = do | ||
261 | validateOptions opts | ||
262 | sock <- bindServ | ||
263 | tref <- newEmptyMVar | ||
264 | tran <- newIORef optSeedTransaction | ||
265 | calls <- newIORef M.empty | ||
266 | return $ Manager sock opts tref tran calls handlers | ||
267 | where | ||
268 | bindServ = do | ||
269 | let family = sockAddrFamily servAddr | ||
270 | sock <- socket family Datagram defaultProtocol | ||
271 | when (family == AF_INET6) $ do | ||
272 | setSocketOption sock IPv6Only 0 | ||
273 | bindSocket sock servAddr | ||
274 | return sock | ||
275 | |||
276 | -- | Unblock all pending calls and close socket. | ||
277 | closeManager :: Manager m -> IO () | ||
278 | closeManager Manager {..} = do | ||
279 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
280 | -- TODO unblock calls | ||
281 | close sock | ||
282 | |||
283 | -- | Check if the manager is still active. Manager becomes active | ||
284 | -- until 'closeManager' called. | ||
285 | isActive :: Manager m -> IO Bool | ||
286 | isActive Manager {..} = liftIO $ isBound sock | ||
287 | {-# INLINE isActive #-} | ||
288 | |||
289 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
290 | -- function. | ||
291 | #ifdef VERSION_bencoding | ||
292 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] | ||
293 | #else | ||
294 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString] | ||
295 | #endif | ||
296 | -> (Manager h -> IO a) -> IO a | ||
297 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
298 | |||
299 | {----------------------------------------------------------------------- | ||
300 | -- Logging | ||
301 | -----------------------------------------------------------------------} | ||
302 | |||
303 | -- TODO prettify log messages | ||
304 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | ||
305 | querySignature name transaction addr = T.concat | ||
306 | #ifdef VERSION_bencoding | ||
307 | [ "&", T.decodeUtf8 name | ||
308 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction | ||
309 | #else | ||
310 | [ "&", T.pack (show name) | ||
311 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | ||
312 | #endif | ||
313 | , " @", T.pack (show addr) | ||
314 | ] | ||
315 | |||
316 | {----------------------------------------------------------------------- | ||
317 | -- Client | ||
318 | -----------------------------------------------------------------------} | ||
319 | -- we don't need to know about TransactionId while performing query, | ||
320 | -- so we introduce QueryFailure exceptions | ||
321 | |||
322 | -- | Used to signal 'query' errors. | ||
323 | data QueryFailure | ||
324 | = SendFailed -- ^ unable to send query; | ||
325 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
326 | | TimeoutExpired -- ^ remote node not responding. | ||
327 | deriving (Show, Eq, Typeable) | ||
328 | |||
329 | instance Exception QueryFailure | ||
330 | |||
331 | sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | ||
332 | sendMessage sock addr a = do | ||
333 | liftIO $ sendManyTo sock [a] addr | ||
334 | |||
335 | genTransactionId :: TransactionCounter -> IO TransactionId | ||
336 | genTransactionId ref = do | ||
337 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
338 | #ifdef VERSION_bencoding | ||
339 | return $ BC.pack (show cur) | ||
340 | #else | ||
341 | return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') | ||
342 | #endif | ||
343 | |||
344 | -- | How many times 'query' call have been performed. | ||
345 | getQueryCount :: MonadKRPC h m => m Int | ||
346 | getQueryCount = do | ||
347 | Manager {..} <- getManager | ||
348 | curTrans <- liftIO $ readIORef transactionCounter | ||
349 | return $ curTrans - optSeedTransaction options | ||
350 | |||
351 | registerQuery :: CallId -> PendingCalls -> IO CallRes | ||
352 | registerQuery cid ref = do | ||
353 | ares <- newEmptyMVar | ||
354 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
355 | return ares | ||
356 | |||
357 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
358 | -- or more responses to the same query | ||
359 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | ||
360 | unregisterQuery cid ref = do | ||
361 | atomicModifyIORef' ref $ swap . | ||
362 | M.updateLookupWithKey (const (const Nothing)) cid | ||
363 | |||
364 | |||
365 | -- (sendmsg EINVAL) | ||
366 | sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () | ||
367 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
368 | where | ||
369 | sockError :: IOError -> IO () | ||
370 | sockError _ = throwIO SendFailed | ||
371 | |||
372 | -- | Enqueue query to the given node. | ||
373 | -- | ||
374 | -- This function should throw 'QueryFailure' exception if quered node | ||
375 | -- respond with @error@ message or the query timeout expires. | ||
376 | -- | ||
377 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | ||
378 | query addr params = queryK addr params (\_ x _ -> x) | ||
379 | |||
380 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
381 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | ||
382 | query' addr params = queryK addr params (const (,)) | ||
383 | |||
384 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
385 | -- remote Node. This is useful for handling extensions that this library does | ||
386 | -- not otherwise support. | ||
387 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) | ||
388 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | ||
389 | |||
390 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | ||
391 | SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x | ||
392 | queryK addr params kont = do | ||
393 | Manager {..} <- getManager | ||
394 | tid <- liftIO $ genTransactionId transactionCounter | ||
395 | let queryMethod = method :: Method a b | ||
396 | let signature = querySignature (methodName queryMethod) tid addr | ||
397 | $(logDebugS) "query.sending" signature | ||
398 | |||
399 | mres <- liftIO $ do | ||
400 | ares <- registerQuery (tid, addr) pendingCalls | ||
401 | |||
402 | #ifdef VERSION_bencoding | ||
403 | let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid) | ||
404 | qb = encodePayload q :: KMessage | ||
405 | qbs = encodeHeaders () qb :: BC.ByteString | ||
406 | #else | ||
407 | let q = Tox.Message (methodName queryMethod) cli tid params | ||
408 | cli = error "TODO TOX client node id" | ||
409 | ctx = error "TODO TOX ToxCipherContext" | ||
410 | qb = encodePayload q :: Tox.Message BC.ByteString | ||
411 | qbs = encodeHeaders ctx qb :: BC.ByteString | ||
412 | #endif | ||
413 | sendQuery sock addr qbs | ||
414 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
415 | |||
416 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
417 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) | ||
418 | case res of | ||
419 | #ifdef VERSION_bencoding | ||
420 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
421 | Right (R (KResponse {..})) -> | ||
422 | case fromBEncode respVals of | ||
423 | Right r -> pure $ kont raw r respIP | ||
424 | #else | ||
425 | Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" | ||
426 | Right (Tox.Message {..}) -> | ||
427 | case S.decode msgPayload of | ||
428 | Right r -> pure $ kont raw r Nothing | ||
429 | #endif | ||
430 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
431 | |||
432 | case mres of | ||
433 | Just res -> do | ||
434 | $(logDebugS) "query.responded" $ signature | ||
435 | return res | ||
436 | |||
437 | Nothing -> do | ||
438 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
439 | $(logWarnS) "query.not_responding" $ signature <> " for " <> | ||
440 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
441 | throw $ TimeoutExpired | ||
442 | |||
443 | {----------------------------------------------------------------------- | ||
444 | -- Handlers | ||
445 | -----------------------------------------------------------------------} | ||
446 | -- we already throw: | ||
447 | -- | ||
448 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
449 | -- | ||
450 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
451 | -- | ||
452 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
453 | -- async exception too) | ||
454 | -- | ||
455 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
456 | |||
457 | -- | Used to signal protocol errors. | ||
458 | data HandlerFailure | ||
459 | = BadAddress -- ^ for e.g.: node calls herself; | ||
460 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
461 | deriving (Show, Eq, Typeable) | ||
462 | |||
463 | instance Exception HandlerFailure | ||
464 | |||
465 | prettyHF :: HandlerFailure -> BS.ByteString | ||
466 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
467 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
468 | "invalid parameter: " <> reason | ||
469 | |||
470 | prettyQF :: QueryFailure -> BS.ByteString | ||
471 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
472 | <> T.pack (show e) | ||
473 | |||
474 | -- | Make handler from handler function. Any thrown exception will be | ||
475 | -- supressed and send over the wire back to the querying node. | ||
476 | -- | ||
477 | -- If the handler make some 'query' normally it /should/ handle | ||
478 | -- corresponding 'QueryFailure's. | ||
479 | -- | ||
480 | handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) | ||
481 | => (SockAddr -> a -> h b) -> Handler h msg raw | ||
482 | handler body = (name, wrapper) | ||
483 | where | ||
484 | Method name = method :: Method a b | ||
485 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) | ||
486 | wrapper addr args = | ||
487 | case decodePayload args of | ||
488 | Left e -> pure $ Left e | ||
489 | Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) | ||
490 | |||
491 | runHandler :: MonadKRPC h m | ||
492 | #ifdef VERSION_bencoding | ||
493 | => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult | ||
494 | #else | ||
495 | => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult | ||
496 | #endif | ||
497 | runHandler h addr m = Lifted.catches wrapper failbacks | ||
498 | where | ||
499 | signature = querySignature (queryMethod m) (queryId m) addr | ||
500 | |||
501 | wrapper = do | ||
502 | $(logDebugS) "handler.quered" signature | ||
503 | #ifdef VERSION_bencoding | ||
504 | result <- liftHandler (h addr (Q m)) | ||
505 | #else | ||
506 | result <- liftHandler (h addr m) | ||
507 | #endif | ||
508 | |||
509 | case result of | ||
510 | Left msg -> do | ||
511 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
512 | #ifdef VERSION_bencoding | ||
513 | return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) | ||
514 | #else | ||
515 | return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) | ||
516 | #endif | ||
517 | |||
518 | Right a -> do -- KQueryArgs | ||
519 | $(logDebugS) "handler.success" signature | ||
520 | return $ Right a | ||
521 | |||
522 | failbacks = | ||
523 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
524 | $(logDebugS) "handler.failed" signature | ||
525 | #ifdef VERSION_bencoding | ||
526 | return $ Left $ KError ProtocolError (prettyHF e) (queryId m) | ||
527 | #else | ||
528 | return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) | ||
529 | #endif | ||
530 | |||
531 | |||
532 | -- may happen if handler makes query and fail | ||
533 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
534 | #ifdef VERSION_bencoding | ||
535 | return $ Left $ KError ServerError (prettyQF e) (queryId m) | ||
536 | #else | ||
537 | return $ Left $ decodeError "TODO TOX ServerError" (queryId m) | ||
538 | #endif | ||
539 | |||
540 | -- since handler thread exit after sendMessage we can safely | ||
541 | -- suppress async exception here | ||
542 | , E.Handler $ \ (e :: SomeException) -> do | ||
543 | #ifdef VERSION_bencoding | ||
544 | return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) | ||
545 | #else | ||
546 | return $ Left $ decodeError "TODO TOX GenericError" (queryId m) | ||
547 | #endif | ||
548 | ] | ||
549 | |||
550 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | ||
551 | dispatchHandler q addr = do | ||
552 | Manager {..} <- getManager | ||
553 | case L.lookup (queryMethod q) handlers of | ||
554 | #ifdef VERSION_bencoding | ||
555 | Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) | ||
556 | #else | ||
557 | Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) | ||
558 | #endif | ||
559 | Just h -> runHandler h addr q | ||
560 | |||
561 | {----------------------------------------------------------------------- | ||
562 | -- Listener | ||
563 | -----------------------------------------------------------------------} | ||
564 | |||
565 | -- TODO bound amount of parallel handler *threads*: | ||
566 | -- | ||
567 | -- peer A flooding with find_node | ||
568 | -- peer B trying to ping peer C | ||
569 | -- peer B fork too many threads | ||
570 | -- ... space leak | ||
571 | -- | ||
572 | handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () | ||
573 | handleQuery raw q addr = void $ fork $ do | ||
574 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
575 | Manager {..} <- getManager | ||
576 | res <- dispatchHandler q addr | ||
577 | #ifdef VERSION_bencoding | ||
578 | let res' = either E id res | ||
579 | resbe = either toBEncode toBEncode res | ||
580 | $(logOther "q") $ T.unlines | ||
581 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
582 | , "==>" | ||
583 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
584 | ] | ||
585 | sendMessage sock addr $ encodeHeaders () res' | ||
586 | #else | ||
587 | -- Errors not sent for Tox. | ||
588 | let ctx = error "TODO TOX ToxCipherContext 2" | ||
589 | either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res | ||
590 | #endif | ||
591 | |||
592 | handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () | ||
593 | handleResponse raw result addr = do | ||
594 | Manager {..} <- getManager | ||
595 | liftIO $ do | ||
596 | #ifdef VERSION_bencoding | ||
597 | let resultId = either errorId envelopeTransaction result | ||
598 | #else | ||
599 | let resultId = either Tox.msgNonce Tox.msgNonce result | ||
600 | #endif | ||
601 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
602 | case mcall of | ||
603 | Nothing -> return () | ||
604 | Just ares -> putMVar ares (raw,result) | ||
605 | |||
606 | #ifdef VERSION_bencoding | ||
607 | handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () | ||
608 | handleMessage raw (Q q) = handleQuery raw q | ||
609 | handleMessage raw (R r) = handleResponse raw (Right (R r)) | ||
610 | handleMessage raw (E e) = handleResponse raw (Left e) | ||
611 | #else | ||
612 | handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () | ||
613 | handleMessage raw q | Tox.isQuery q = handleQuery raw q | ||
614 | handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) | ||
615 | handleMessage raw e | Tox.isError e = handleResponse raw (Left e) | ||
616 | #endif | ||
617 | |||
618 | listener :: MonadKRPC h m => m () | ||
619 | listener = do | ||
620 | Manager {..} <- getManager | ||
621 | fix $ \again -> do | ||
622 | let ctx = error "TODO TOX ToxCipherContext 3" | ||
623 | (bs, addr) <- liftIO $ do | ||
624 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
625 | #ifdef VERSION_bencoding | ||
626 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | ||
627 | #else | ||
628 | case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of | ||
629 | #endif | ||
630 | -- TODO ignore unknown messages at all? | ||
631 | #ifdef VERSION_bencoding | ||
632 | Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) | ||
633 | #else | ||
634 | Left _ -> return () -- TODO TOX send unknownMessage error | ||
635 | #endif | ||
636 | Right (raw,m) -> handleMessage raw m addr | ||
637 | again | ||
638 | where | ||
639 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
640 | exceptions e | ||
641 | -- packets with empty payload may trigger eof exception | ||
642 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
643 | | otherwise = throwIO e | ||
644 | |||
645 | -- | Should be run before any 'query', otherwise they will never | ||
646 | -- succeed. | ||
647 | listen :: MonadKRPC h m => m () | ||
648 | listen = do | ||
649 | Manager {..} <- getManager | ||
650 | tid <- fork $ do | ||
651 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | ||
652 | listener `Lifted.finally` | ||
653 | liftIO (takeMVar listenerThread) | ||
654 | liftIO $ putMVar listenerThread tid | ||