diff options
author | joe <joe@jerkface.net> | 2017-06-08 23:26:30 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-06-08 23:26:30 -0400 |
commit | 84798bfef62a001ded1bd628d846612f0b0ade80 (patch) | |
tree | 6a66e1d8fa014bea6f6562650134440a5a515f56 /src/Network/DatagramServer.hs | |
parent | cb2bd0bf4b5977ef6ec7ca7ab9ac0189457c2250 (diff) |
Generalized Network.DatagramServer
Diffstat (limited to 'src/Network/DatagramServer.hs')
-rw-r--r-- | src/Network/DatagramServer.hs | 292 |
1 files changed, 130 insertions, 162 deletions
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs index 21300108..e1bf91c5 100644 --- a/src/Network/DatagramServer.hs +++ b/src/Network/DatagramServer.hs | |||
@@ -60,6 +60,7 @@ | |||
60 | {-# LANGUAGE FunctionalDependencies #-} | 60 | {-# LANGUAGE FunctionalDependencies #-} |
61 | {-# LANGUAGE DeriveDataTypeable #-} | 61 | {-# LANGUAGE DeriveDataTypeable #-} |
62 | {-# LANGUAGE TemplateHaskell #-} | 62 | {-# LANGUAGE TemplateHaskell #-} |
63 | {-# LANGUAGE KindSignatures #-} | ||
63 | module Network.DatagramServer | 64 | module Network.DatagramServer |
64 | ( -- * Methods | 65 | ( -- * Methods |
65 | Method | 66 | Method |
@@ -88,6 +89,7 @@ module Network.DatagramServer | |||
88 | , withManager | 89 | , withManager |
89 | , isActive | 90 | , isActive |
90 | , listen | 91 | , listen |
92 | , Protocol(..) | ||
91 | 93 | ||
92 | -- * Re-exports | 94 | -- * Re-exports |
93 | , ErrorCode (..) | 95 | , ErrorCode (..) |
@@ -96,7 +98,6 @@ module Network.DatagramServer | |||
96 | 98 | ||
97 | import Data.Default.Class | 99 | import Data.Default.Class |
98 | import Network.DatagramServer.Mainline | 100 | import Network.DatagramServer.Mainline |
99 | import Network.KRPC.Method | ||
100 | import Network.Socket (SockAddr (..)) | 101 | import Network.Socket (SockAddr (..)) |
101 | 102 | ||
102 | import Control.Applicative | 103 | import Control.Applicative |
@@ -192,42 +193,38 @@ validateOptions Options {..} | |||
192 | -- Options | 193 | -- Options |
193 | -----------------------------------------------------------------------} | 194 | -----------------------------------------------------------------------} |
194 | 195 | ||
195 | type KResult = Either KError KMessage -- Response | 196 | type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response |
196 | 197 | ||
197 | type TransactionCounter = IORef Int | 198 | type TransactionCounter = IORef Int |
198 | type CallId = (TransactionId, SockAddr) | 199 | type CallId msg = (TransactionID msg, SockAddr) |
199 | type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) | 200 | type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response) |
200 | type PendingCalls = IORef (Map CallId CallRes) | 201 | type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw)) |
201 | 202 | ||
202 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) | 203 | type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) |
203 | 204 | ||
204 | -- | Handler is a function which will be invoked then some /remote/ | 205 | -- | Handler is a function which will be invoked then some /remote/ |
205 | -- node querying /this/ node. | 206 | -- node querying /this/ node. |
206 | type Handler h msg v = (MethodName, HandlerBody h msg v) | 207 | type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) |
207 | 208 | ||
208 | -- | Keep track pending queries made by /this/ node and handle queries | 209 | -- | Keep track pending queries made by /this/ node and handle queries |
209 | -- made by /remote/ nodes. | 210 | -- made by /remote/ nodes. |
210 | data Manager h = Manager | 211 | data Manager h raw msg = Manager |
211 | { sock :: !Socket | 212 | { sock :: !Socket |
212 | , options :: !Options | 213 | , options :: !Options |
213 | , listenerThread :: !(MVar ThreadId) | 214 | , listenerThread :: !(MVar ThreadId) |
214 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | 215 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter |
215 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | 216 | , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) |
216 | #ifdef VERSION_bencoding | 217 | , handlers :: [Handler h msg raw] |
217 | , handlers :: [Handler h KMessageOf BValue] | ||
218 | #else | ||
219 | , handlers :: [Handler h KMessageOf BC.ByteString] | ||
220 | #endif | ||
221 | } | 218 | } |
222 | 219 | ||
223 | -- | A monad which can perform or handle queries. | 220 | -- | A monad which can perform or handle queries. |
224 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | 221 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) |
225 | => MonadKRPC h m | m -> h where | 222 | => MonadKRPC h m raw msg | m -> h, m -> raw, m -> msg where |
226 | 223 | ||
227 | -- | Ask for manager. | 224 | -- | Ask for manager. |
228 | getManager :: m (Manager h) | 225 | getManager :: m (Manager h raw msg) |
229 | 226 | ||
230 | default getManager :: MonadReader (Manager h) m => m (Manager h) | 227 | default getManager :: MonadReader (Manager h raw msg) m => m (Manager h raw msg) |
231 | getManager = ask | 228 | getManager = ask |
232 | 229 | ||
233 | -- | Can be used to add logging for instance. | 230 | -- | Can be used to add logging for instance. |
@@ -237,7 +234,7 @@ class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | |||
237 | liftHandler = id | 234 | liftHandler = id |
238 | 235 | ||
239 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | 236 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) |
240 | => MonadKRPC h (ReaderT (Manager h) h) where | 237 | => MonadKRPC h (ReaderT (Manager h raw msg) h) raw msg where |
241 | 238 | ||
242 | liftHandler = lift | 239 | liftHandler = lift |
243 | 240 | ||
@@ -251,12 +248,8 @@ sockAddrFamily (SockAddrCan _ ) = AF_CAN | |||
251 | -- run 'listen'. | 248 | -- run 'listen'. |
252 | newManager :: Options -- ^ various protocol options; | 249 | newManager :: Options -- ^ various protocol options; |
253 | -> SockAddr -- ^ address to listen on; | 250 | -> SockAddr -- ^ address to listen on; |
254 | #ifdef VERSION_bencoding | 251 | -> [Handler h msg raw] -- ^ handlers to run on incoming queries. |
255 | -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. | 252 | -> IO (Manager h raw msg) -- ^ new rpc manager. |
256 | #else | ||
257 | -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries. | ||
258 | #endif | ||
259 | -> IO (Manager h) -- ^ new rpc manager. | ||
260 | newManager opts @ Options {..} servAddr handlers = do | 253 | newManager opts @ Options {..} servAddr handlers = do |
261 | validateOptions opts | 254 | validateOptions opts |
262 | sock <- bindServ | 255 | sock <- bindServ |
@@ -274,7 +267,7 @@ newManager opts @ Options {..} servAddr handlers = do | |||
274 | return sock | 267 | return sock |
275 | 268 | ||
276 | -- | Unblock all pending calls and close socket. | 269 | -- | Unblock all pending calls and close socket. |
277 | closeManager :: Manager m -> IO () | 270 | closeManager :: Manager m raw msg -> IO () |
278 | closeManager Manager {..} = do | 271 | closeManager Manager {..} = do |
279 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | 272 | maybe (return ()) killThread =<< tryTakeMVar listenerThread |
280 | -- TODO unblock calls | 273 | -- TODO unblock calls |
@@ -282,18 +275,14 @@ closeManager Manager {..} = do | |||
282 | 275 | ||
283 | -- | Check if the manager is still active. Manager becomes active | 276 | -- | Check if the manager is still active. Manager becomes active |
284 | -- until 'closeManager' called. | 277 | -- until 'closeManager' called. |
285 | isActive :: Manager m -> IO Bool | 278 | isActive :: Manager m raw msg -> IO Bool |
286 | isActive Manager {..} = liftIO $ isBound sock | 279 | isActive Manager {..} = liftIO $ isBound sock |
287 | {-# INLINE isActive #-} | 280 | {-# INLINE isActive #-} |
288 | 281 | ||
289 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | 282 | -- | Normally you should use Control.Monad.Trans.Resource.allocate |
290 | -- function. | 283 | -- function. |
291 | #ifdef VERSION_bencoding | 284 | withManager :: Options -> SockAddr -> [Handler h msg raw] |
292 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] | 285 | -> (Manager h raw msg -> IO a) -> IO a |
293 | #else | ||
294 | withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString] | ||
295 | #endif | ||
296 | -> (Manager h -> IO a) -> IO a | ||
297 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | 286 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager |
298 | 287 | ||
299 | {----------------------------------------------------------------------- | 288 | {----------------------------------------------------------------------- |
@@ -301,15 +290,12 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager | |||
301 | -----------------------------------------------------------------------} | 290 | -----------------------------------------------------------------------} |
302 | 291 | ||
303 | -- TODO prettify log messages | 292 | -- TODO prettify log messages |
304 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | 293 | querySignature :: ( Show ( QueryMethod msg ) |
294 | , Serialize ( TransactionID msg ) ) | ||
295 | => QueryMethod msg -> TransactionID msg -> SockAddr -> Text | ||
305 | querySignature name transaction addr = T.concat | 296 | querySignature name transaction addr = T.concat |
306 | #ifdef VERSION_bencoding | ||
307 | [ "&", T.decodeUtf8 name | ||
308 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction | ||
309 | #else | ||
310 | [ "&", T.pack (show name) | 297 | [ "&", T.pack (show name) |
311 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | 298 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) |
312 | #endif | ||
313 | , " @", T.pack (show addr) | 299 | , " @", T.pack (show addr) |
314 | ] | 300 | ] |
315 | 301 | ||
@@ -332,23 +318,19 @@ sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | |||
332 | sendMessage sock addr a = do | 318 | sendMessage sock addr a = do |
333 | liftIO $ sendManyTo sock [a] addr | 319 | liftIO $ sendManyTo sock [a] addr |
334 | 320 | ||
335 | genTransactionId :: TransactionCounter -> IO TransactionId | 321 | genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg) |
336 | genTransactionId ref = do | 322 | genTransactionId ref = do |
337 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | 323 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) |
338 | #ifdef VERSION_bencoding | 324 | uniqueTransactionId cur |
339 | return $ BC.pack (show cur) | ||
340 | #else | ||
341 | return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') | ||
342 | #endif | ||
343 | 325 | ||
344 | -- | How many times 'query' call have been performed. | 326 | -- | How many times 'query' call have been performed. |
345 | getQueryCount :: MonadKRPC h m => m Int | 327 | getQueryCount :: MonadKRPC h m raw msg => m Int |
346 | getQueryCount = do | 328 | getQueryCount = do |
347 | Manager {..} <- getManager | 329 | Manager {..} <- getManager |
348 | curTrans <- liftIO $ readIORef transactionCounter | 330 | curTrans <- liftIO $ readIORef transactionCounter |
349 | return $ curTrans - optSeedTransaction options | 331 | return $ curTrans - optSeedTransaction options |
350 | 332 | ||
351 | registerQuery :: CallId -> PendingCalls -> IO CallRes | 333 | registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw) |
352 | registerQuery cid ref = do | 334 | registerQuery cid ref = do |
353 | ares <- newEmptyMVar | 335 | ares <- newEmptyMVar |
354 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | 336 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) |
@@ -356,7 +338,7 @@ registerQuery cid ref = do | |||
356 | 338 | ||
357 | -- simultaneous M.lookup and M.delete guarantees that we never get two | 339 | -- simultaneous M.lookup and M.delete guarantees that we never get two |
358 | -- or more responses to the same query | 340 | -- or more responses to the same query |
359 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | 341 | unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw)) |
360 | unregisterQuery cid ref = do | 342 | unregisterQuery cid ref = do |
361 | atomicModifyIORef' ref $ swap . | 343 | atomicModifyIORef' ref $ swap . |
362 | M.updateLookupWithKey (const (const Nothing)) cid | 344 | M.updateLookupWithKey (const (const Nothing)) cid |
@@ -374,35 +356,37 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q | |||
374 | -- This function should throw 'QueryFailure' exception if quered node | 356 | -- This function should throw 'QueryFailure' exception if quered node |
375 | -- respond with @error@ message or the query timeout expires. | 357 | -- respond with @error@ message or the query timeout expires. |
376 | -- | 358 | -- |
377 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | 359 | query :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m b |
378 | query addr params = queryK addr params (\_ x _ -> x) | 360 | query meth addr params = queryK meth addr params (\_ x _ -> x) |
379 | 361 | ||
380 | -- | Like 'query' but possibly returns your externally routable IP address. | 362 | -- | Like 'query' but possibly returns your externally routable IP address. |
381 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | 363 | query' :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m (b, Maybe ReflectedIP) |
382 | query' addr params = queryK addr params (const (,)) | 364 | query' meth addr params = queryK meth addr params (const (,)) |
383 | 365 | ||
384 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | 366 | -- | Enqueue a query, but give us the complete BEncoded content sent by the |
385 | -- remote Node. This is useful for handling extensions that this library does | 367 | -- remote Node. This is useful for handling extensions that this library does |
386 | -- not otherwise support. | 368 | -- not otherwise support. |
387 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) | 369 | queryRaw :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => QueryMethod msg -> SockAddr -> a -> m (b, raw) |
388 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | 370 | queryRaw meth addr params = queryK meth addr params (\raw x _ -> (x,raw)) |
389 | 371 | ||
390 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | 372 | queryK :: forall h m a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg, KRPC a b) => |
391 | SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x | 373 | QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> m x |
392 | queryK addr params kont = do | 374 | queryK meth addr params kont = do |
393 | Manager {..} <- getManager | 375 | Manager {..} <- getManager |
394 | tid <- liftIO $ genTransactionId transactionCounter | 376 | tid <- liftIO $ genTransactionId transactionCounter |
395 | let queryMethod = method :: Method a b | 377 | -- let queryMethod = method :: Method a b |
396 | let signature = querySignature (methodName queryMethod) tid addr | 378 | let signature = querySignature meth tid addr |
397 | $(logDebugS) "query.sending" signature | 379 | $(logDebugS) "query.sending" signature |
398 | 380 | ||
399 | mres <- liftIO $ do | 381 | mres <- liftIO $ do |
400 | ares <- registerQuery (tid, addr) pendingCalls | 382 | ares <- registerQuery (tid, addr) pendingCalls |
401 | 383 | ||
384 | let cli = error "TODO TOX client node id" | ||
385 | ctx = error "TODO TOX ToxCipherContext or () for Mainline" | ||
386 | q <- buildQuery cli addr meth tid params | ||
387 | let qb = encodePayload (q :: msg a) :: msg raw | ||
388 | qbs = encodeHeaders ctx qb | ||
402 | #ifdef VERSION_bencoding | 389 | #ifdef VERSION_bencoding |
403 | let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid) | ||
404 | qb = encodePayload q :: KMessage | ||
405 | qbs = encodeHeaders () qb :: BC.ByteString | ||
406 | #else | 390 | #else |
407 | let q = Tox.Message (methodName queryMethod) cli tid params | 391 | let q = Tox.Message (methodName queryMethod) cli tid params |
408 | cli = error "TODO TOX client node id" | 392 | cli = error "TODO TOX client node id" |
@@ -416,18 +400,13 @@ queryK addr params kont = do | |||
416 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | 400 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do |
417 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) | 401 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) |
418 | case res of | 402 | case res of |
419 | #ifdef VERSION_bencoding | ||
420 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | 403 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) |
421 | Right (R (KResponse {..})) -> | 404 | Right m -> case decodePayload m of |
422 | case fromBEncode respVals of | 405 | Right r -> case envelopeClass (r :: msg b) of |
423 | Right r -> pure $ kont raw r respIP | 406 | Response reflectedAddr -> pure $ kont raw (envelopePayload r) reflectedAddr |
424 | #else | 407 | Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary? |
425 | Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" | 408 | Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE" |
426 | Right (Tox.Message {..}) -> | 409 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) |
427 | case S.decode msgPayload of | ||
428 | Right r -> pure $ kont raw r Nothing | ||
429 | #endif | ||
430 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
431 | 410 | ||
432 | case mres of | 411 | case mres of |
433 | Just res -> do | 412 | Just res -> do |
@@ -477,43 +456,33 @@ prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | |||
477 | -- If the handler make some 'query' normally it /should/ handle | 456 | -- If the handler make some 'query' normally it /should/ handle |
478 | -- corresponding 'QueryFailure's. | 457 | -- corresponding 'QueryFailure's. |
479 | -- | 458 | -- |
480 | handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) | 459 | handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) |
481 | => (SockAddr -> a -> h b) -> Handler h msg raw | 460 | => QueryMethod msg -> (SockAddr -> a -> h b) -> Handler h msg raw |
482 | handler body = (name, wrapper) | 461 | handler name body = (name, wrapper) |
483 | where | 462 | where |
484 | Method name = method :: Method a b | ||
485 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) | 463 | wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) |
486 | wrapper addr args = | 464 | wrapper addr args = |
487 | case decodePayload args of | 465 | case decodePayload args of |
488 | Left e -> pure $ Left e | 466 | Left e -> pure $ Left e |
489 | Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) | 467 | Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) |
490 | 468 | ||
491 | runHandler :: MonadKRPC h m | 469 | runHandler :: ( MonadKRPC h m raw msg |
492 | #ifdef VERSION_bencoding | 470 | , Envelope msg |
493 | => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult | 471 | , Show (QueryMethod msg) |
494 | #else | 472 | , Serialize (TransactionID msg)) |
495 | => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult | 473 | => QueryMethod msg -> HandlerBody h msg raw -> SockAddr -> msg raw -> m (KResult msg raw) |
496 | #endif | 474 | runHandler meth h addr m = Lifted.catches wrapper failbacks |
497 | runHandler h addr m = Lifted.catches wrapper failbacks | ||
498 | where | 475 | where |
499 | signature = querySignature (queryMethod m) (queryId m) addr | 476 | signature = querySignature meth (envelopeTransaction m) addr |
500 | 477 | ||
501 | wrapper = do | 478 | wrapper = do |
502 | $(logDebugS) "handler.quered" signature | 479 | $(logDebugS) "handler.quered" signature |
503 | #ifdef VERSION_bencoding | ||
504 | result <- liftHandler (h addr (Q m)) | ||
505 | #else | ||
506 | result <- liftHandler (h addr m) | 480 | result <- liftHandler (h addr m) |
507 | #endif | ||
508 | 481 | ||
509 | case result of | 482 | case result of |
510 | Left msg -> do | 483 | Left msg -> do |
511 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | 484 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg |
512 | #ifdef VERSION_bencoding | 485 | return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m) |
513 | return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) | ||
514 | #else | ||
515 | return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) | ||
516 | #endif | ||
517 | 486 | ||
518 | Right a -> do -- KQueryArgs | 487 | Right a -> do -- KQueryArgs |
519 | $(logDebugS) "handler.success" signature | 488 | $(logDebugS) "handler.success" signature |
@@ -522,41 +491,30 @@ runHandler h addr m = Lifted.catches wrapper failbacks | |||
522 | failbacks = | 491 | failbacks = |
523 | [ E.Handler $ \ (e :: HandlerFailure) -> do | 492 | [ E.Handler $ \ (e :: HandlerFailure) -> do |
524 | $(logDebugS) "handler.failed" signature | 493 | $(logDebugS) "handler.failed" signature |
525 | #ifdef VERSION_bencoding | 494 | return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m) |
526 | return $ Left $ KError ProtocolError (prettyHF e) (queryId m) | ||
527 | #else | ||
528 | return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) | ||
529 | #endif | ||
530 | 495 | ||
531 | 496 | ||
532 | -- may happen if handler makes query and fail | 497 | -- may happen if handler makes query and fail |
533 | , E.Handler $ \ (e :: QueryFailure) -> do | 498 | , E.Handler $ \ (e :: QueryFailure) -> do |
534 | #ifdef VERSION_bencoding | 499 | return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m) |
535 | return $ Left $ KError ServerError (prettyQF e) (queryId m) | ||
536 | #else | ||
537 | return $ Left $ decodeError "TODO TOX ServerError" (queryId m) | ||
538 | #endif | ||
539 | 500 | ||
540 | -- since handler thread exit after sendMessage we can safely | 501 | -- since handler thread exit after sendMessage we can safely |
541 | -- suppress async exception here | 502 | -- suppress async exception here |
542 | , E.Handler $ \ (e :: SomeException) -> do | 503 | , E.Handler $ \ (e :: SomeException) -> do |
543 | #ifdef VERSION_bencoding | 504 | return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m) |
544 | return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) | ||
545 | #else | ||
546 | return $ Left $ decodeError "TODO TOX GenericError" (queryId m) | ||
547 | #endif | ||
548 | ] | 505 | ] |
549 | 506 | ||
550 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | 507 | dispatchHandler :: ( MonadKRPC h m raw msg |
551 | dispatchHandler q addr = do | 508 | , Eq (QueryMethod msg) |
509 | , Show (QueryMethod msg) | ||
510 | , Serialize (TransactionID msg) | ||
511 | , Envelope msg | ||
512 | ) => QueryMethod msg -> msg raw -> SockAddr -> m (KResult msg raw) | ||
513 | dispatchHandler meth q addr = do | ||
552 | Manager {..} <- getManager | 514 | Manager {..} <- getManager |
553 | case L.lookup (queryMethod q) handlers of | 515 | case L.lookup meth handlers of |
554 | #ifdef VERSION_bencoding | 516 | Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) |
555 | Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) | 517 | Just h -> runHandler meth h addr q |
556 | #else | ||
557 | Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) | ||
558 | #endif | ||
559 | Just h -> runHandler h addr q | ||
560 | 518 | ||
561 | {----------------------------------------------------------------------- | 519 | {----------------------------------------------------------------------- |
562 | -- Listener | 520 | -- Listener |
@@ -569,71 +527,75 @@ dispatchHandler q addr = do | |||
569 | -- peer B fork too many threads | 527 | -- peer B fork too many threads |
570 | -- ... space leak | 528 | -- ... space leak |
571 | -- | 529 | -- |
572 | handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () | 530 | handleQuery :: ( MonadKRPC h m raw msg |
573 | handleQuery raw q addr = void $ fork $ do | 531 | , WireFormat raw msg |
532 | , Eq (QueryMethod msg) | ||
533 | , Show (QueryMethod msg) | ||
534 | , Serialize (TransactionID msg) | ||
535 | ) => QueryMethod msg -> raw -> msg raw -> SockAddr -> m () | ||
536 | handleQuery meth raw q addr = void $ fork $ do | ||
574 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | 537 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" |
575 | Manager {..} <- getManager | 538 | Manager {..} <- getManager |
576 | res <- dispatchHandler q addr | 539 | res <- dispatchHandler meth q addr |
577 | #ifdef VERSION_bencoding | 540 | #ifdef VERSION_bencoding |
578 | let res' = either E id res | 541 | let res' = either buildError Just res |
579 | resbe = either toBEncode toBEncode res | 542 | ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline" |
580 | $(logOther "q") $ T.unlines | 543 | resbs = fmap (encodeHeaders ctx) res' :: Maybe BS.ByteString |
581 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | 544 | -- TODO: Generalize this debug print. |
582 | , "==>" | 545 | -- resbe = either toBEncode toBEncode res |
583 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | 546 | -- $(logOther "q") $ T.unlines |
584 | ] | 547 | -- [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) |
585 | sendMessage sock addr $ encodeHeaders () res' | 548 | -- , "==>" |
549 | -- , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
550 | -- ] | ||
551 | maybe (return ()) (sendMessage sock addr) resbs | ||
586 | #else | 552 | #else |
587 | -- Errors not sent for Tox. | 553 | -- Errors not sent for Tox. |
588 | let ctx = error "TODO TOX ToxCipherContext 2" | 554 | let ctx = error "TODO TOX ToxCipherContext 2" |
589 | either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res | 555 | either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res |
590 | #endif | 556 | #endif |
591 | 557 | ||
592 | handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () | 558 | handleResponse :: ( MonadKRPC h m raw msg |
559 | , Ord (TransactionID msg) | ||
560 | , Envelope msg | ||
561 | ) => raw -> KResult msg raw -> SockAddr -> m () | ||
593 | handleResponse raw result addr = do | 562 | handleResponse raw result addr = do |
594 | Manager {..} <- getManager | 563 | Manager {..} <- getManager |
595 | liftIO $ do | 564 | liftIO $ do |
596 | #ifdef VERSION_bencoding | ||
597 | let resultId = either errorId envelopeTransaction result | 565 | let resultId = either errorId envelopeTransaction result |
598 | #else | ||
599 | let resultId = either Tox.msgNonce Tox.msgNonce result | ||
600 | #endif | ||
601 | mcall <- unregisterQuery (resultId, addr) pendingCalls | 566 | mcall <- unregisterQuery (resultId, addr) pendingCalls |
602 | case mcall of | 567 | case mcall of |
603 | Nothing -> return () | 568 | Nothing -> return () |
604 | Just ares -> putMVar ares (raw,result) | 569 | Just ares -> putMVar ares (raw,result) |
605 | 570 | ||
606 | #ifdef VERSION_bencoding | 571 | data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw) |
607 | handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () | 572 | , msgProxy :: !(Proxy msg) |
608 | handleMessage raw (Q q) = handleQuery raw q | 573 | } |
609 | handleMessage raw (R r) = handleResponse raw (Right (R r)) | 574 | |
610 | handleMessage raw (E e) = handleResponse raw (Left e) | 575 | listener :: forall h m raw msg. |
611 | #else | 576 | ( MonadKRPC h m raw msg |
612 | handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () | 577 | , WireFormat raw msg |
613 | handleMessage raw q | Tox.isQuery q = handleQuery raw q | 578 | , Ord (TransactionID msg) |
614 | handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) | 579 | , Eq (QueryMethod msg) |
615 | handleMessage raw e | Tox.isError e = handleResponse raw (Left e) | 580 | , Show (QueryMethod msg) |
616 | #endif | 581 | , Serialize (TransactionID msg) |
617 | 582 | ) => Protocol raw msg -> m () | |
618 | listener :: MonadKRPC h m => m () | 583 | listener p = do |
619 | listener = do | ||
620 | Manager {..} <- getManager | 584 | Manager {..} <- getManager |
621 | fix $ \again -> do | 585 | fix $ \again -> do |
622 | let ctx = error "TODO TOX ToxCipherContext 3" | 586 | let ctx = error "TODO TOX ToxCipherContext or () for Mainline" |
623 | (bs, addr) <- liftIO $ do | 587 | (bs, addr) <- liftIO $ do |
624 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | 588 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) |
625 | #ifdef VERSION_bencoding | 589 | case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of |
626 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | 590 | Left e -> -- XXX: Send parse failure message? |
627 | #else | 591 | -- liftIO $ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e) |
628 | case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of | 592 | return () -- Without transaction id, error message isn't very useful. |
629 | #endif | 593 | Right (raw,m) -> |
630 | -- TODO ignore unknown messages at all? | 594 | case envelopeClass m of |
631 | #ifdef VERSION_bencoding | 595 | Query meth -> handleQuery meth (raw::raw) m addr |
632 | Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) | 596 | Response _ -> handleResponse raw (Right m) addr |
633 | #else | 597 | Error e -> handleResponse raw (Left e) addr |
634 | Left _ -> return () -- TODO TOX send unknownMessage error | 598 | |
635 | #endif | ||
636 | Right (raw,m) -> handleMessage raw m addr | ||
637 | again | 599 | again |
638 | where | 600 | where |
639 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | 601 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) |
@@ -644,11 +606,17 @@ listener = do | |||
644 | 606 | ||
645 | -- | Should be run before any 'query', otherwise they will never | 607 | -- | Should be run before any 'query', otherwise they will never |
646 | -- succeed. | 608 | -- succeed. |
647 | listen :: MonadKRPC h m => m () | 609 | listen :: ( MonadKRPC h m raw msg |
648 | listen = do | 610 | , WireFormat raw msg |
611 | , Ord (TransactionID msg) | ||
612 | , Eq (QueryMethod msg) | ||
613 | , Show (QueryMethod msg) | ||
614 | , Serialize (TransactionID msg) | ||
615 | ) => Protocol raw msg -> m () | ||
616 | listen p = do | ||
649 | Manager {..} <- getManager | 617 | Manager {..} <- getManager |
650 | tid <- fork $ do | 618 | tid <- fork $ do |
651 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | 619 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" |
652 | listener `Lifted.finally` | 620 | listener p `Lifted.finally` |
653 | liftIO (takeMVar listenerThread) | 621 | liftIO (takeMVar listenerThread) |
654 | liftIO $ putMVar listenerThread tid | 622 | liftIO $ putMVar listenerThread tid |