summaryrefslogtreecommitdiff
path: root/src/Network/DatagramServer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/DatagramServer.hs')
-rw-r--r--src/Network/DatagramServer.hs292
1 files changed, 130 insertions, 162 deletions
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs
index 21300108..e1bf91c5 100644
--- a/src/Network/DatagramServer.hs
+++ b/src/Network/DatagramServer.hs
@@ -60,6 +60,7 @@
60{-# LANGUAGE FunctionalDependencies #-} 60{-# LANGUAGE FunctionalDependencies #-}
61{-# LANGUAGE DeriveDataTypeable #-} 61{-# LANGUAGE DeriveDataTypeable #-}
62{-# LANGUAGE TemplateHaskell #-} 62{-# LANGUAGE TemplateHaskell #-}
63{-# LANGUAGE KindSignatures #-}
63module Network.DatagramServer 64module Network.DatagramServer
64 ( -- * Methods 65 ( -- * Methods
65 Method 66 Method
@@ -88,6 +89,7 @@ module Network.DatagramServer
88 , withManager 89 , withManager
89 , isActive 90 , isActive
90 , listen 91 , listen
92 , Protocol(..)
91 93
92 -- * Re-exports 94 -- * Re-exports
93 , ErrorCode (..) 95 , ErrorCode (..)
@@ -96,7 +98,6 @@ module Network.DatagramServer
96 98
97import Data.Default.Class 99import Data.Default.Class
98import Network.DatagramServer.Mainline 100import Network.DatagramServer.Mainline
99import Network.KRPC.Method
100import Network.Socket (SockAddr (..)) 101import Network.Socket (SockAddr (..))
101 102
102import Control.Applicative 103import Control.Applicative
@@ -192,42 +193,38 @@ validateOptions Options {..}
192-- Options 193-- Options
193-----------------------------------------------------------------------} 194-----------------------------------------------------------------------}
194 195
195type KResult = Either KError KMessage -- Response 196type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response
196 197
197type TransactionCounter = IORef Int 198type TransactionCounter = IORef Int
198type CallId = (TransactionId, SockAddr) 199type CallId msg = (TransactionID msg, SockAddr)
199type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) 200type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response)
200type PendingCalls = IORef (Map CallId CallRes) 201type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw))
201 202
202type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) 203type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v))
203 204
204-- | Handler is a function which will be invoked then some /remote/ 205-- | Handler is a function which will be invoked then some /remote/
205-- node querying /this/ node. 206-- node querying /this/ node.
206type Handler h msg v = (MethodName, HandlerBody h msg v) 207type Handler h msg v = (QueryMethod msg, HandlerBody h msg v)
207 208
208-- | Keep track pending queries made by /this/ node and handle queries 209-- | Keep track pending queries made by /this/ node and handle queries
209-- made by /remote/ nodes. 210-- made by /remote/ nodes.
210data Manager h = Manager 211data Manager h raw msg = Manager
211 { sock :: !Socket 212 { sock :: !Socket
212 , options :: !Options 213 , options :: !Options
213 , listenerThread :: !(MVar ThreadId) 214 , listenerThread :: !(MVar ThreadId)
214 , transactionCounter :: {-# UNPACK #-} !TransactionCounter 215 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
215 , pendingCalls :: {-# UNPACK #-} !PendingCalls 216 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw)
216#ifdef VERSION_bencoding 217 , handlers :: [Handler h msg raw]
217 , handlers :: [Handler h KMessageOf BValue]
218#else
219 , handlers :: [Handler h KMessageOf BC.ByteString]
220#endif
221 } 218 }
222 219
223-- | A monad which can perform or handle queries. 220-- | A monad which can perform or handle queries.
224class (MonadBaseControl IO m, MonadLogger m, MonadIO m) 221class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
225 => MonadKRPC h m | m -> h where 222 => MonadKRPC h m raw msg | m -> h, m -> raw, m -> msg where
226 223
227 -- | Ask for manager. 224 -- | Ask for manager.
228 getManager :: m (Manager h) 225 getManager :: m (Manager h raw msg)
229 226
230 default getManager :: MonadReader (Manager h) m => m (Manager h) 227 default getManager :: MonadReader (Manager h raw msg) m => m (Manager h raw msg)
231 getManager = ask 228 getManager = ask
232 229
233 -- | Can be used to add logging for instance. 230 -- | Can be used to add logging for instance.
@@ -237,7 +234,7 @@ class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
237 liftHandler = id 234 liftHandler = id
238 235
239instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) 236instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
240 => MonadKRPC h (ReaderT (Manager h) h) where 237 => MonadKRPC h (ReaderT (Manager h raw msg) h) raw msg where
241 238
242 liftHandler = lift 239 liftHandler = lift
243 240
@@ -251,12 +248,8 @@ sockAddrFamily (SockAddrCan _ ) = AF_CAN
251-- run 'listen'. 248-- run 'listen'.
252newManager :: Options -- ^ various protocol options; 249newManager :: Options -- ^ various protocol options;
253 -> SockAddr -- ^ address to listen on; 250 -> SockAddr -- ^ address to listen on;
254#ifdef VERSION_bencoding 251 -> [Handler h msg raw] -- ^ handlers to run on incoming queries.
255 -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. 252 -> IO (Manager h raw msg) -- ^ new rpc manager.
256#else
257 -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries.
258#endif
259 -> IO (Manager h) -- ^ new rpc manager.
260newManager opts @ Options {..} servAddr handlers = do 253newManager opts @ Options {..} servAddr handlers = do
261 validateOptions opts 254 validateOptions opts
262 sock <- bindServ 255 sock <- bindServ
@@ -274,7 +267,7 @@ newManager opts @ Options {..} servAddr handlers = do
274 return sock 267 return sock
275 268
276-- | Unblock all pending calls and close socket. 269-- | Unblock all pending calls and close socket.
277closeManager :: Manager m -> IO () 270closeManager :: Manager m raw msg -> IO ()
278closeManager Manager {..} = do 271closeManager Manager {..} = do
279 maybe (return ()) killThread =<< tryTakeMVar listenerThread 272 maybe (return ()) killThread =<< tryTakeMVar listenerThread
280 -- TODO unblock calls 273 -- TODO unblock calls
@@ -282,18 +275,14 @@ closeManager Manager {..} = do
282 275
283-- | Check if the manager is still active. Manager becomes active 276-- | Check if the manager is still active. Manager becomes active
284-- until 'closeManager' called. 277-- until 'closeManager' called.
285isActive :: Manager m -> IO Bool 278isActive :: Manager m raw msg -> IO Bool
286isActive Manager {..} = liftIO $ isBound sock 279isActive Manager {..} = liftIO $ isBound sock
287{-# INLINE isActive #-} 280{-# INLINE isActive #-}
288 281
289-- | Normally you should use Control.Monad.Trans.Resource.allocate 282-- | Normally you should use Control.Monad.Trans.Resource.allocate
290-- function. 283-- function.
291#ifdef VERSION_bencoding 284withManager :: Options -> SockAddr -> [Handler h msg raw]
292withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] 285 -> (Manager h raw msg -> IO a) -> IO a
293#else
294withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString]
295#endif
296 -> (Manager h -> IO a) -> IO a
297withManager opts addr hs = bracket (newManager opts addr hs) closeManager 286withManager opts addr hs = bracket (newManager opts addr hs) closeManager
298 287
299{----------------------------------------------------------------------- 288{-----------------------------------------------------------------------
@@ -301,15 +290,12 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager
301-----------------------------------------------------------------------} 290-----------------------------------------------------------------------}
302 291
303-- TODO prettify log messages 292-- TODO prettify log messages
304querySignature :: MethodName -> TransactionId -> SockAddr -> Text 293querySignature :: ( Show ( QueryMethod msg )
294 , Serialize ( TransactionID msg ) )
295 => QueryMethod msg -> TransactionID msg -> SockAddr -> Text
305querySignature name transaction addr = T.concat 296querySignature name transaction addr = T.concat
306#ifdef VERSION_bencoding
307 [ "&", T.decodeUtf8 name
308 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
309#else
310 [ "&", T.pack (show name) 297 [ "&", T.pack (show name)
311 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) 298 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
312#endif
313 , " @", T.pack (show addr) 299 , " @", T.pack (show addr)
314 ] 300 ]
315 301
@@ -332,23 +318,19 @@ sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
332sendMessage sock addr a = do 318sendMessage sock addr a = do
333 liftIO $ sendManyTo sock [a] addr 319 liftIO $ sendManyTo sock [a] addr
334 320
335genTransactionId :: TransactionCounter -> IO TransactionId 321genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg)
336genTransactionId ref = do 322genTransactionId ref = do
337 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) 323 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
338#ifdef VERSION_bencoding 324 uniqueTransactionId cur
339 return $ BC.pack (show cur)
340#else
341 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
342#endif
343 325
344-- | How many times 'query' call have been performed. 326-- | How many times 'query' call have been performed.
345getQueryCount :: MonadKRPC h m => m Int 327getQueryCount :: MonadKRPC h m raw msg => m Int
346getQueryCount = do 328getQueryCount = do
347 Manager {..} <- getManager 329 Manager {..} <- getManager
348 curTrans <- liftIO $ readIORef transactionCounter 330 curTrans <- liftIO $ readIORef transactionCounter
349 return $ curTrans - optSeedTransaction options 331 return $ curTrans - optSeedTransaction options
350 332
351registerQuery :: CallId -> PendingCalls -> IO CallRes 333registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw)
352registerQuery cid ref = do 334registerQuery cid ref = do
353 ares <- newEmptyMVar 335 ares <- newEmptyMVar
354 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) 336 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
@@ -356,7 +338,7 @@ registerQuery cid ref = do
356 338
357-- simultaneous M.lookup and M.delete guarantees that we never get two 339-- simultaneous M.lookup and M.delete guarantees that we never get two
358-- or more responses to the same query 340-- or more responses to the same query
359unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) 341unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw))
360unregisterQuery cid ref = do 342unregisterQuery cid ref = do
361 atomicModifyIORef' ref $ swap . 343 atomicModifyIORef' ref $ swap .
362 M.updateLookupWithKey (const (const Nothing)) cid 344 M.updateLookupWithKey (const (const Nothing)) cid
@@ -374,35 +356,37 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q
374-- This function should throw 'QueryFailure' exception if quered node 356-- This function should throw 'QueryFailure' exception if quered node
375-- respond with @error@ message or the query timeout expires. 357-- respond with @error@ message or the query timeout expires.
376-- 358--
377query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b 359query :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m b
378query addr params = queryK addr params (\_ x _ -> x) 360query meth addr params = queryK meth addr params (\_ x _ -> x)
379 361
380-- | Like 'query' but possibly returns your externally routable IP address. 362-- | Like 'query' but possibly returns your externally routable IP address.
381query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) 363query' :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m (b, Maybe ReflectedIP)
382query' addr params = queryK addr params (const (,)) 364query' meth addr params = queryK meth addr params (const (,))
383 365
384-- | Enqueue a query, but give us the complete BEncoded content sent by the 366-- | Enqueue a query, but give us the complete BEncoded content sent by the
385-- remote Node. This is useful for handling extensions that this library does 367-- remote Node. This is useful for handling extensions that this library does
386-- not otherwise support. 368-- not otherwise support.
387queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) 369queryRaw :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m (b, raw)
388queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) 370queryRaw meth addr params = queryK meth addr params (\raw x _ -> (x,raw))
389 371
390queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => 372queryK :: forall h m a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) =>
391 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x 373 QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> m x
392queryK addr params kont = do 374queryK meth addr params kont = do
393 Manager {..} <- getManager 375 Manager {..} <- getManager
394 tid <- liftIO $ genTransactionId transactionCounter 376 tid <- liftIO $ genTransactionId transactionCounter
395 let queryMethod = method :: Method a b 377 -- let queryMethod = method :: Method a b
396 let signature = querySignature (methodName queryMethod) tid addr 378 let signature = querySignature meth tid addr
397 $(logDebugS) "query.sending" signature 379 $(logDebugS) "query.sending" signature
398 380
399 mres <- liftIO $ do 381 mres <- liftIO $ do
400 ares <- registerQuery (tid, addr) pendingCalls 382 ares <- registerQuery (tid, addr) pendingCalls
401 383
384 let cli = error "TODO TOX client node id"
385 ctx = error "TODO TOX ToxCipherContext or () for Mainline"
386 q <- buildQuery cli addr meth tid params
387 let qb = encodePayload (q :: msg a) :: msg raw
388 qbs = encodeHeaders ctx qb
402#ifdef VERSION_bencoding 389#ifdef VERSION_bencoding
403 let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid)
404 qb = encodePayload q :: KMessage
405 qbs = encodeHeaders () qb :: BC.ByteString
406#else 390#else
407 let q = Tox.Message (methodName queryMethod) cli tid params 391 let q = Tox.Message (methodName queryMethod) cli tid params
408 cli = error "TODO TOX client node id" 392 cli = error "TODO TOX client node id"
@@ -416,18 +400,13 @@ queryK addr params kont = do
416 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do 400 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
417 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) 401 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
418 case res of 402 case res of
419#ifdef VERSION_bencoding
420 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) 403 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
421 Right (R (KResponse {..})) -> 404 Right m -> case decodePayload m of
422 case fromBEncode respVals of 405 Right r -> case envelopeClass (r :: msg b) of
423 Right r -> pure $ kont raw r respIP 406 Response reflectedAddr -> pure $ kont raw (envelopePayload r) reflectedAddr
424#else 407 Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary?
425 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" 408 Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE"
426 Right (Tox.Message {..}) -> 409 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
427 case S.decode msgPayload of
428 Right r -> pure $ kont raw r Nothing
429#endif
430 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
431 410
432 case mres of 411 case mres of
433 Just res -> do 412 Just res -> do
@@ -477,43 +456,33 @@ prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
477-- If the handler make some 'query' normally it /should/ handle 456-- If the handler make some 'query' normally it /should/ handle
478-- corresponding 'QueryFailure's. 457-- corresponding 'QueryFailure's.
479-- 458--
480handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) 459handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b)
481 => (SockAddr -> a -> h b) -> Handler h msg raw 460 => QueryMethod msg -> (SockAddr -> a -> h b) -> Handler h msg raw
482handler body = (name, wrapper) 461handler name body = (name, wrapper)
483 where 462 where
484 Method name = method :: Method a b
485 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) 463 wrapper :: SockAddr -> msg raw -> h (Either String (msg raw))
486 wrapper addr args = 464 wrapper addr args =
487 case decodePayload args of 465 case decodePayload args of
488 Left e -> pure $ Left e 466 Left e -> pure $ Left e
489 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) 467 Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a)
490 468
491runHandler :: MonadKRPC h m 469runHandler :: ( MonadKRPC h m raw msg
492#ifdef VERSION_bencoding 470 , Envelope msg
493 => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult 471 , Show (QueryMethod msg)
494#else 472 , Serialize (TransactionID msg))
495 => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult 473 => QueryMethod msg -> HandlerBody h msg raw -> SockAddr -> msg raw -> m (KResult msg raw)
496#endif 474runHandler meth h addr m = Lifted.catches wrapper failbacks
497runHandler h addr m = Lifted.catches wrapper failbacks
498 where 475 where
499 signature = querySignature (queryMethod m) (queryId m) addr 476 signature = querySignature meth (envelopeTransaction m) addr
500 477
501 wrapper = do 478 wrapper = do
502 $(logDebugS) "handler.quered" signature 479 $(logDebugS) "handler.quered" signature
503#ifdef VERSION_bencoding
504 result <- liftHandler (h addr (Q m))
505#else
506 result <- liftHandler (h addr m) 480 result <- liftHandler (h addr m)
507#endif
508 481
509 case result of 482 case result of
510 Left msg -> do 483 Left msg -> do
511 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg 484 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
512#ifdef VERSION_bencoding 485 return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m)
513 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
514#else
515 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
516#endif
517 486
518 Right a -> do -- KQueryArgs 487 Right a -> do -- KQueryArgs
519 $(logDebugS) "handler.success" signature 488 $(logDebugS) "handler.success" signature
@@ -522,41 +491,30 @@ runHandler h addr m = Lifted.catches wrapper failbacks
522 failbacks = 491 failbacks =
523 [ E.Handler $ \ (e :: HandlerFailure) -> do 492 [ E.Handler $ \ (e :: HandlerFailure) -> do
524 $(logDebugS) "handler.failed" signature 493 $(logDebugS) "handler.failed" signature
525#ifdef VERSION_bencoding 494 return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m)
526 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
527#else
528 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
529#endif
530 495
531 496
532 -- may happen if handler makes query and fail 497 -- may happen if handler makes query and fail
533 , E.Handler $ \ (e :: QueryFailure) -> do 498 , E.Handler $ \ (e :: QueryFailure) -> do
534#ifdef VERSION_bencoding 499 return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m)
535 return $ Left $ KError ServerError (prettyQF e) (queryId m)
536#else
537 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
538#endif
539 500
540 -- since handler thread exit after sendMessage we can safely 501 -- since handler thread exit after sendMessage we can safely
541 -- suppress async exception here 502 -- suppress async exception here
542 , E.Handler $ \ (e :: SomeException) -> do 503 , E.Handler $ \ (e :: SomeException) -> do
543#ifdef VERSION_bencoding 504 return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m)
544 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
545#else
546 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
547#endif
548 ] 505 ]
549 506
550dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult 507dispatchHandler :: ( MonadKRPC h m raw msg
551dispatchHandler q addr = do 508 , Eq (QueryMethod msg)
509 , Show (QueryMethod msg)
510 , Serialize (TransactionID msg)
511 , Envelope msg
512 ) => QueryMethod msg -> msg raw -> SockAddr -> m (KResult msg raw)
513dispatchHandler meth q addr = do
552 Manager {..} <- getManager 514 Manager {..} <- getManager
553 case L.lookup (queryMethod q) handlers of 515 case L.lookup meth handlers of
554#ifdef VERSION_bencoding 516 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q)
555 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) 517 Just h -> runHandler meth h addr q
556#else
557 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
558#endif
559 Just h -> runHandler h addr q
560 518
561{----------------------------------------------------------------------- 519{-----------------------------------------------------------------------
562-- Listener 520-- Listener
@@ -569,71 +527,75 @@ dispatchHandler q addr = do
569-- peer B fork too many threads 527-- peer B fork too many threads
570-- ... space leak 528-- ... space leak
571-- 529--
572handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () 530handleQuery :: ( MonadKRPC h m raw msg
573handleQuery raw q addr = void $ fork $ do 531 , WireFormat raw msg
532 , Eq (QueryMethod msg)
533 , Show (QueryMethod msg)
534 , Serialize (TransactionID msg)
535 ) => QueryMethod msg -> raw -> msg raw -> SockAddr -> m ()
536handleQuery meth raw q addr = void $ fork $ do
574 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" 537 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
575 Manager {..} <- getManager 538 Manager {..} <- getManager
576 res <- dispatchHandler q addr 539 res <- dispatchHandler meth q addr
577#ifdef VERSION_bencoding 540#ifdef VERSION_bencoding
578 let res' = either E id res 541 let res' = either buildError Just res
579 resbe = either toBEncode toBEncode res 542 ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline"
580 $(logOther "q") $ T.unlines 543 resbs = fmap (encodeHeaders ctx) res' :: Maybe BS.ByteString
581 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) 544-- TODO: Generalize this debug print.
582 , "==>" 545-- resbe = either toBEncode toBEncode res
583 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) 546-- $(logOther "q") $ T.unlines
584 ] 547-- [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
585 sendMessage sock addr $ encodeHeaders () res' 548-- , "==>"
549-- , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
550-- ]
551 maybe (return ()) (sendMessage sock addr) resbs
586#else 552#else
587 -- Errors not sent for Tox. 553 -- Errors not sent for Tox.
588 let ctx = error "TODO TOX ToxCipherContext 2" 554 let ctx = error "TODO TOX ToxCipherContext 2"
589 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res 555 either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res
590#endif 556#endif
591 557
592handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () 558handleResponse :: ( MonadKRPC h m raw msg
559 , Ord (TransactionID msg)
560 , Envelope msg
561 ) => raw -> KResult msg raw -> SockAddr -> m ()
593handleResponse raw result addr = do 562handleResponse raw result addr = do
594 Manager {..} <- getManager 563 Manager {..} <- getManager
595 liftIO $ do 564 liftIO $ do
596#ifdef VERSION_bencoding
597 let resultId = either errorId envelopeTransaction result 565 let resultId = either errorId envelopeTransaction result
598#else
599 let resultId = either Tox.msgNonce Tox.msgNonce result
600#endif
601 mcall <- unregisterQuery (resultId, addr) pendingCalls 566 mcall <- unregisterQuery (resultId, addr) pendingCalls
602 case mcall of 567 case mcall of
603 Nothing -> return () 568 Nothing -> return ()
604 Just ares -> putMVar ares (raw,result) 569 Just ares -> putMVar ares (raw,result)
605 570
606#ifdef VERSION_bencoding 571data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw)
607handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () 572 , msgProxy :: !(Proxy msg)
608handleMessage raw (Q q) = handleQuery raw q 573 }
609handleMessage raw (R r) = handleResponse raw (Right (R r)) 574
610handleMessage raw (E e) = handleResponse raw (Left e) 575listener :: forall h m raw msg.
611#else 576 ( MonadKRPC h m raw msg
612handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () 577 , WireFormat raw msg
613handleMessage raw q | Tox.isQuery q = handleQuery raw q 578 , Ord (TransactionID msg)
614handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) 579 , Eq (QueryMethod msg)
615handleMessage raw e | Tox.isError e = handleResponse raw (Left e) 580 , Show (QueryMethod msg)
616#endif 581 , Serialize (TransactionID msg)
617 582 ) => Protocol raw msg -> m ()
618listener :: MonadKRPC h m => m () 583listener p = do
619listener = do
620 Manager {..} <- getManager 584 Manager {..} <- getManager
621 fix $ \again -> do 585 fix $ \again -> do
622 let ctx = error "TODO TOX ToxCipherContext 3" 586 let ctx = error "TODO TOX ToxCipherContext or () for Mainline"
623 (bs, addr) <- liftIO $ do 587 (bs, addr) <- liftIO $ do
624 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) 588 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
625#ifdef VERSION_bencoding 589 case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of
626 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of 590 Left e -> -- XXX: Send parse failure message?
627#else 591 -- liftIO $ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e)
628 case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of 592 return () -- Without transaction id, error message isn't very useful.
629#endif 593 Right (raw,m) ->
630 -- TODO ignore unknown messages at all? 594 case envelopeClass m of
631#ifdef VERSION_bencoding 595 Query meth -> handleQuery meth (raw::raw) m addr
632 Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) 596 Response _ -> handleResponse raw (Right m) addr
633#else 597 Error e -> handleResponse raw (Left e) addr
634 Left _ -> return () -- TODO TOX send unknownMessage error 598
635#endif
636 Right (raw,m) -> handleMessage raw m addr
637 again 599 again
638 where 600 where
639 exceptions :: IOError -> IO (BS.ByteString, SockAddr) 601 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
@@ -644,11 +606,17 @@ listener = do
644 606
645-- | Should be run before any 'query', otherwise they will never 607-- | Should be run before any 'query', otherwise they will never
646-- succeed. 608-- succeed.
647listen :: MonadKRPC h m => m () 609listen :: ( MonadKRPC h m raw msg
648listen = do 610 , WireFormat raw msg
611 , Ord (TransactionID msg)
612 , Eq (QueryMethod msg)
613 , Show (QueryMethod msg)
614 , Serialize (TransactionID msg)
615 ) => Protocol raw msg -> m ()
616listen p = do
649 Manager {..} <- getManager 617 Manager {..} <- getManager
650 tid <- fork $ do 618 tid <- fork $ do
651 myThreadId >>= liftIO . flip labelThread "KRPC.listen" 619 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
652 listener `Lifted.finally` 620 listener p `Lifted.finally`
653 liftIO (takeMVar listenerThread) 621 liftIO (takeMVar listenerThread)
654 liftIO $ putMVar listenerThread tid 622 liftIO $ putMVar listenerThread tid