diff options
author | joe <joe@jerkface.net> | 2017-01-18 21:24:38 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-18 21:24:38 -0500 |
commit | 1d7dd944e0a13d3f09b65f7629d1f96098ea7974 (patch) | |
tree | 6c02f4d9d6e95f9a2d596c1854d5938daeeeddcc /src/Network/KRPC/Manager.hs | |
parent | 3c9e37d4f349ba2b4395cb10b5a3671decf89d68 (diff) | |
parent | a8498921ddf37e864968a3865e3e254352b5d285 (diff) |
Merge branch 'krpc' into dht-only
Diffstat (limited to 'src/Network/KRPC/Manager.hs')
-rw-r--r-- | src/Network/KRPC/Manager.hs | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs new file mode 100644 index 00000000..c90c92f9 --- /dev/null +++ b/src/Network/KRPC/Manager.hs | |||
@@ -0,0 +1,485 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally, you don't need to import this module. | ||
9 | -- | ||
10 | {-# LANGUAGE OverloadedStrings #-} | ||
11 | {-# LANGUAGE FlexibleInstances #-} | ||
12 | {-# LANGUAGE FlexibleContexts #-} | ||
13 | {-# LANGUAGE ScopedTypeVariables #-} | ||
14 | {-# LANGUAGE DefaultSignatures #-} | ||
15 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
16 | {-# LANGUAGE FunctionalDependencies #-} | ||
17 | {-# LANGUAGE DeriveDataTypeable #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | ||
19 | module Network.KRPC.Manager | ||
20 | ( -- * Manager | ||
21 | MonadKRPC (..) | ||
22 | , Options (..) | ||
23 | , Manager | ||
24 | , newManager | ||
25 | , closeManager | ||
26 | , withManager | ||
27 | , isActive | ||
28 | , listen | ||
29 | |||
30 | -- * Queries | ||
31 | , QueryFailure (..) | ||
32 | , query | ||
33 | , query' | ||
34 | , queryRaw | ||
35 | , getQueryCount | ||
36 | |||
37 | -- * Handlers | ||
38 | , HandlerFailure (..) | ||
39 | , Handler | ||
40 | , handler | ||
41 | ) where | ||
42 | |||
43 | import Control.Applicative | ||
44 | import Control.Concurrent | ||
45 | import Control.Concurrent.Lifted (fork) | ||
46 | import Control.Exception hiding (Handler) | ||
47 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
48 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
49 | import Control.Monad | ||
50 | import Control.Monad.Logger | ||
51 | import Control.Monad.Reader | ||
52 | import Control.Monad.Trans.Control | ||
53 | import Data.BEncode as BE | ||
54 | import Data.BEncode.Internal as BE | ||
55 | import Data.BEncode.Pretty (showBEncode) | ||
56 | import Data.ByteString as BS | ||
57 | import Data.ByteString.Char8 as BC | ||
58 | import Data.ByteString.Lazy as BL | ||
59 | import Data.Default.Class | ||
60 | import Data.IORef | ||
61 | import Data.List as L | ||
62 | import Data.Map as M | ||
63 | import Data.Monoid | ||
64 | import Data.Text as T | ||
65 | import Data.Text.Encoding as T | ||
66 | import Data.Tuple | ||
67 | import Data.Typeable | ||
68 | import Network.KRPC.Message | ||
69 | import Network.KRPC.Method | ||
70 | import Network.Socket hiding (listen) | ||
71 | import Network.Socket.ByteString as BS | ||
72 | import System.IO.Error | ||
73 | import System.Timeout | ||
74 | |||
75 | |||
76 | {----------------------------------------------------------------------- | ||
77 | -- Options | ||
78 | -----------------------------------------------------------------------} | ||
79 | |||
80 | -- | RPC manager options. | ||
81 | data Options = Options | ||
82 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
83 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
84 | |||
85 | -- | Time to wait for response from remote node, in seconds. | ||
86 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
87 | |||
88 | -- | Maximum number of bytes to receive. | ||
89 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
90 | } deriving (Show, Eq) | ||
91 | |||
92 | defaultSeedTransaction :: Int | ||
93 | defaultSeedTransaction = 0 | ||
94 | |||
95 | defaultQueryTimeout :: Int | ||
96 | defaultQueryTimeout = 120 | ||
97 | |||
98 | defaultMaxMsgSize :: Int | ||
99 | defaultMaxMsgSize = 64 * 1024 | ||
100 | |||
101 | -- | Permissive defaults. | ||
102 | instance Default Options where | ||
103 | def = Options | ||
104 | { optSeedTransaction = defaultSeedTransaction | ||
105 | , optQueryTimeout = defaultQueryTimeout | ||
106 | , optMaxMsgSize = defaultMaxMsgSize | ||
107 | } | ||
108 | |||
109 | validateOptions :: Options -> IO () | ||
110 | validateOptions Options {..} | ||
111 | | optQueryTimeout < 1 | ||
112 | = throwIO (userError "krpc: non-positive query timeout") | ||
113 | | optMaxMsgSize < 1 | ||
114 | = throwIO (userError "krpc: non-positive buffer size") | ||
115 | | otherwise = return () | ||
116 | |||
117 | {----------------------------------------------------------------------- | ||
118 | -- Options | ||
119 | -----------------------------------------------------------------------} | ||
120 | |||
121 | type KResult = Either KError KResponse | ||
122 | |||
123 | type TransactionCounter = IORef Int | ||
124 | type CallId = (TransactionId, SockAddr) | ||
125 | type CallRes = MVar (BValue, KResult) | ||
126 | type PendingCalls = IORef (Map CallId CallRes) | ||
127 | |||
128 | type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) | ||
129 | |||
130 | -- | Handler is a function which will be invoked then some /remote/ | ||
131 | -- node querying /this/ node. | ||
132 | type Handler h = (MethodName, HandlerBody h) | ||
133 | |||
134 | -- | Keep track pending queries made by /this/ node and handle queries | ||
135 | -- made by /remote/ nodes. | ||
136 | data Manager h = Manager | ||
137 | { sock :: !Socket | ||
138 | , options :: !Options | ||
139 | , listenerThread :: !(MVar ThreadId) | ||
140 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
141 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | ||
142 | , handlers :: [Handler h] | ||
143 | } | ||
144 | |||
145 | -- | A monad which can perform or handle queries. | ||
146 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | ||
147 | => MonadKRPC h m | m -> h where | ||
148 | |||
149 | -- | Ask for manager. | ||
150 | getManager :: m (Manager h) | ||
151 | |||
152 | default getManager :: MonadReader (Manager h) m => m (Manager h) | ||
153 | getManager = ask | ||
154 | |||
155 | -- | Can be used to add logging for instance. | ||
156 | liftHandler :: h a -> m a | ||
157 | |||
158 | default liftHandler :: m a -> m a | ||
159 | liftHandler = id | ||
160 | |||
161 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | ||
162 | => MonadKRPC h (ReaderT (Manager h) h) where | ||
163 | |||
164 | liftHandler = lift | ||
165 | |||
166 | sockAddrFamily :: SockAddr -> Family | ||
167 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
168 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
169 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
170 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
171 | |||
172 | -- | Bind socket to the specified address. To enable query handling | ||
173 | -- run 'listen'. | ||
174 | newManager :: Options -- ^ various protocol options; | ||
175 | -> SockAddr -- ^ address to listen on; | ||
176 | -> [Handler h] -- ^ handlers to run on incoming queries. | ||
177 | -> IO (Manager h) -- ^ new rpc manager. | ||
178 | newManager opts @ Options {..} servAddr handlers = do | ||
179 | validateOptions opts | ||
180 | sock <- bindServ | ||
181 | tref <- newEmptyMVar | ||
182 | tran <- newIORef optSeedTransaction | ||
183 | calls <- newIORef M.empty | ||
184 | return $ Manager sock opts tref tran calls handlers | ||
185 | where | ||
186 | bindServ = do | ||
187 | let family = sockAddrFamily servAddr | ||
188 | sock <- socket family Datagram defaultProtocol | ||
189 | when (family == AF_INET6) $ do | ||
190 | setSocketOption sock IPv6Only 0 | ||
191 | bindSocket sock servAddr | ||
192 | return sock | ||
193 | |||
194 | -- | Unblock all pending calls and close socket. | ||
195 | closeManager :: Manager m -> IO () | ||
196 | closeManager Manager {..} = do | ||
197 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
198 | -- TODO unblock calls | ||
199 | close sock | ||
200 | |||
201 | -- | Check if the manager is still active. Manager becomes active | ||
202 | -- until 'closeManager' called. | ||
203 | isActive :: Manager m -> IO Bool | ||
204 | isActive Manager {..} = liftIO $ isBound sock | ||
205 | {-# INLINE isActive #-} | ||
206 | |||
207 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
208 | -- function. | ||
209 | withManager :: Options -> SockAddr -> [Handler h] | ||
210 | -> (Manager h -> IO a) -> IO a | ||
211 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
212 | |||
213 | {----------------------------------------------------------------------- | ||
214 | -- Logging | ||
215 | -----------------------------------------------------------------------} | ||
216 | |||
217 | -- TODO prettify log messages | ||
218 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | ||
219 | querySignature name transaction addr = T.concat | ||
220 | [ "&", T.decodeUtf8 name | ||
221 | , " #", T.decodeUtf8 transaction | ||
222 | , " @", T.pack (show addr) | ||
223 | ] | ||
224 | |||
225 | {----------------------------------------------------------------------- | ||
226 | -- Client | ||
227 | -----------------------------------------------------------------------} | ||
228 | -- we don't need to know about TransactionId while performing query, | ||
229 | -- so we introduce QueryFailure exceptions | ||
230 | |||
231 | -- | Used to signal 'query' errors. | ||
232 | data QueryFailure | ||
233 | = SendFailed -- ^ unable to send query; | ||
234 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
235 | | TimeoutExpired -- ^ remote node not responding. | ||
236 | deriving (Show, Eq, Typeable) | ||
237 | |||
238 | instance Exception QueryFailure | ||
239 | |||
240 | sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () | ||
241 | sendMessage sock addr a = do | ||
242 | liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr | ||
243 | |||
244 | genTransactionId :: TransactionCounter -> IO TransactionId | ||
245 | genTransactionId ref = do | ||
246 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
247 | return $ BC.pack (show cur) | ||
248 | |||
249 | -- | How many times 'query' call have been performed. | ||
250 | getQueryCount :: MonadKRPC h m => m Int | ||
251 | getQueryCount = do | ||
252 | Manager {..} <- getManager | ||
253 | curTrans <- liftIO $ readIORef transactionCounter | ||
254 | return $ curTrans - optSeedTransaction options | ||
255 | |||
256 | registerQuery :: CallId -> PendingCalls -> IO CallRes | ||
257 | registerQuery cid ref = do | ||
258 | ares <- newEmptyMVar | ||
259 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
260 | return ares | ||
261 | |||
262 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
263 | -- or more responses to the same query | ||
264 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | ||
265 | unregisterQuery cid ref = do | ||
266 | atomicModifyIORef' ref $ swap . | ||
267 | M.updateLookupWithKey (const (const Nothing)) cid | ||
268 | |||
269 | |||
270 | -- (sendmsg EINVAL) | ||
271 | sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () | ||
272 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
273 | where | ||
274 | sockError :: IOError -> IO () | ||
275 | sockError _ = throwIO SendFailed | ||
276 | |||
277 | -- | Enqueue query to the given node. | ||
278 | -- | ||
279 | -- This function should throw 'QueryFailure' exception if quered node | ||
280 | -- respond with @error@ message or the query timeout expires. | ||
281 | -- | ||
282 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | ||
283 | query addr params = queryK addr params (\_ x _ -> x) | ||
284 | |||
285 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
286 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | ||
287 | query' addr params = queryK addr params (const (,)) | ||
288 | |||
289 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
290 | -- remote Node. This is useful for handling extensions that this library does | ||
291 | -- not otherwise support. | ||
292 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) | ||
293 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | ||
294 | |||
295 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | ||
296 | SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x | ||
297 | queryK addr params kont = do | ||
298 | Manager {..} <- getManager | ||
299 | tid <- liftIO $ genTransactionId transactionCounter | ||
300 | let queryMethod = method :: Method a b | ||
301 | let signature = querySignature (methodName queryMethod) tid addr | ||
302 | $(logDebugS) "query.sending" signature | ||
303 | |||
304 | mres <- liftIO $ do | ||
305 | ares <- registerQuery (tid, addr) pendingCalls | ||
306 | |||
307 | let q = KQuery (toBEncode params) (methodName queryMethod) tid | ||
308 | sendQuery sock addr q | ||
309 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
310 | |||
311 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
312 | (raw,res) <- readMVar ares | ||
313 | case res of | ||
314 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
315 | Right (KResponse {..}) -> | ||
316 | case fromBEncode respVals of | ||
317 | Right r -> pure $ kont raw r respIP | ||
318 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
319 | |||
320 | case mres of | ||
321 | Just res -> do | ||
322 | $(logDebugS) "query.responded" $ signature | ||
323 | return res | ||
324 | |||
325 | Nothing -> do | ||
326 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
327 | $(logWarnS) "query.not_responding" $ signature <> " for " <> | ||
328 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
329 | throw $ TimeoutExpired | ||
330 | |||
331 | {----------------------------------------------------------------------- | ||
332 | -- Handlers | ||
333 | -----------------------------------------------------------------------} | ||
334 | -- we already throw: | ||
335 | -- | ||
336 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
337 | -- | ||
338 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
339 | -- | ||
340 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
341 | -- async exception too) | ||
342 | -- | ||
343 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
344 | |||
345 | -- | Used to signal protocol errors. | ||
346 | data HandlerFailure | ||
347 | = BadAddress -- ^ for e.g.: node calls herself; | ||
348 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
349 | deriving (Show, Eq, Typeable) | ||
350 | |||
351 | instance Exception HandlerFailure | ||
352 | |||
353 | prettyHF :: HandlerFailure -> BS.ByteString | ||
354 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
355 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
356 | "invalid parameter: " <> reason | ||
357 | |||
358 | prettyQF :: QueryFailure -> BS.ByteString | ||
359 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
360 | <> T.pack (show e) | ||
361 | |||
362 | -- | Make handler from handler function. Any thrown exception will be | ||
363 | -- supressed and send over the wire back to the querying node. | ||
364 | -- | ||
365 | -- If the handler make some 'query' normally it /should/ handle | ||
366 | -- corresponding 'QueryFailure's. | ||
367 | -- | ||
368 | handler :: forall h a b. (KRPC a b, Monad h) | ||
369 | => (SockAddr -> a -> h b) -> Handler h | ||
370 | handler body = (name, wrapper) | ||
371 | where | ||
372 | Method name = method :: Method a b | ||
373 | wrapper addr args = | ||
374 | case fromBEncode args of | ||
375 | Left e -> return $ Left e | ||
376 | Right a -> do | ||
377 | r <- body addr a | ||
378 | return $ Right $ toBEncode r | ||
379 | |||
380 | runHandler :: MonadKRPC h m | ||
381 | => HandlerBody h -> SockAddr -> KQuery -> m KResult | ||
382 | runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks | ||
383 | where | ||
384 | signature = querySignature queryMethod queryId addr | ||
385 | |||
386 | wrapper = do | ||
387 | $(logDebugS) "handler.quered" signature | ||
388 | result <- liftHandler (h addr queryArgs) | ||
389 | |||
390 | case result of | ||
391 | Left msg -> do | ||
392 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
393 | return $ Left $ KError ProtocolError (BC.pack msg) queryId | ||
394 | |||
395 | Right a -> do | ||
396 | $(logDebugS) "handler.success" signature | ||
397 | return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) | ||
398 | |||
399 | failbacks = | ||
400 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
401 | $(logDebugS) "handler.failed" signature | ||
402 | return $ Left $ KError ProtocolError (prettyHF e) queryId | ||
403 | |||
404 | -- may happen if handler makes query and fail | ||
405 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
406 | return $ Left $ KError ServerError (prettyQF e) queryId | ||
407 | |||
408 | -- since handler thread exit after sendMessage we can safely | ||
409 | -- suppress async exception here | ||
410 | , E.Handler $ \ (e :: SomeException) -> do | ||
411 | return $ Left $ KError GenericError (BC.pack (show e)) queryId | ||
412 | ] | ||
413 | |||
414 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | ||
415 | dispatchHandler q @ KQuery {..} addr = do | ||
416 | Manager {..} <- getManager | ||
417 | case L.lookup queryMethod handlers of | ||
418 | Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId | ||
419 | Just h -> runHandler h addr q | ||
420 | |||
421 | {----------------------------------------------------------------------- | ||
422 | -- Listener | ||
423 | -----------------------------------------------------------------------} | ||
424 | |||
425 | -- TODO bound amount of parallel handler *threads*: | ||
426 | -- | ||
427 | -- peer A flooding with find_node | ||
428 | -- peer B trying to ping peer C | ||
429 | -- peer B fork too many threads | ||
430 | -- ... space leak | ||
431 | -- | ||
432 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () | ||
433 | handleQuery raw q addr = void $ fork $ do | ||
434 | Manager {..} <- getManager | ||
435 | res <- dispatchHandler q addr | ||
436 | let resbe = either toBEncode toBEncode res | ||
437 | $(logOther "q") $ T.unlines | ||
438 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
439 | , "==>" | ||
440 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
441 | ] | ||
442 | sendMessage sock addr resbe | ||
443 | |||
444 | handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () | ||
445 | handleResponse raw result addr = do | ||
446 | Manager {..} <- getManager | ||
447 | liftIO $ do | ||
448 | let resultId = either errorId respId result | ||
449 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
450 | case mcall of | ||
451 | Nothing -> return () | ||
452 | Just ares -> putMVar ares (raw,result) | ||
453 | |||
454 | handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () | ||
455 | handleMessage raw (Q q) = handleQuery raw q | ||
456 | handleMessage raw (R r) = handleResponse raw (Right r) | ||
457 | handleMessage raw (E e) = handleResponse raw (Left e) | ||
458 | |||
459 | listener :: MonadKRPC h m => m () | ||
460 | listener = do | ||
461 | Manager {..} <- getManager | ||
462 | forever $ do | ||
463 | (bs, addr) <- liftIO $ do | ||
464 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
465 | |||
466 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | ||
467 | -- TODO ignore unknown messages at all? | ||
468 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e | ||
469 | Right (raw,m) -> handleMessage raw m addr | ||
470 | where | ||
471 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
472 | exceptions e | ||
473 | -- packets with empty payload may trigger eof exception | ||
474 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
475 | | otherwise = throwIO e | ||
476 | |||
477 | -- | Should be run before any 'query', otherwise they will never | ||
478 | -- succeed. | ||
479 | listen :: MonadKRPC h m => m () | ||
480 | listen = do | ||
481 | Manager {..} <- getManager | ||
482 | tid <- fork $ do | ||
483 | listener `Lifted.finally` | ||
484 | liftIO (takeMVar listenerThread) | ||
485 | liftIO $ putMVar listenerThread tid | ||