diff options
author | James Crayne <jim.crayne@gmail.com> | 2015-06-21 07:36:46 -0400 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2015-06-21 15:35:26 -0400 |
commit | d420aefe0b14081b00d4655f9e8317903b5b02c3 (patch) | |
tree | 7b58c8a784d1ef23be05e62aaf0c8311353d57d5 /KikiD/Multiplex.hs | |
parent | 75d4b3b8bc60ff1d7ccc3990833a4439c0af02bc (diff) |
kikid: The Beginnings of the Kiki Daemon
Diffstat (limited to 'KikiD/Multiplex.hs')
-rw-r--r-- | KikiD/Multiplex.hs | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs new file mode 100644 index 0000000..4a31127 --- /dev/null +++ b/KikiD/Multiplex.hs | |||
@@ -0,0 +1,100 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE TupleSections #-} | ||
4 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
6 | {-# LANGUAGE DeriveGeneric #-} | ||
7 | {-# LANGUAGE BangPatterns #-} | ||
8 | module KikiD.Multiplex where | ||
9 | |||
10 | import System.IO | ||
11 | import qualified Data.ByteString.Char8 as B | ||
12 | import Data.Monoid | ||
13 | import Control.Concurrent.STM | ||
14 | import Data.Map.Strict as M | ||
15 | import Control.Monad | ||
16 | import Control.Concurrent | ||
17 | import qualified Data.Binary as Bin | ||
18 | import Control.Concurrent.STM.TBMQueue | ||
19 | import Control.Monad.Loops | ||
20 | import Data.List | ||
21 | import Data.Maybe | ||
22 | |||
23 | -- | pipeTransHookMicroseconds | ||
24 | -- | ||
25 | -- This function indefinitely reads the @fromChan@ queue and applies | ||
26 | -- the function @translate@ to the contents before passing it on to the | ||
27 | -- @toChan@ queue. The @triggerAction@ is performed on the message prior | ||
28 | -- to the translation. The @fromChan@ queue is checked every @micros@ | ||
29 | -- microseconds from the last emptying. | ||
30 | -- | ||
31 | -- To terminate the thread, close @fromChan@ queue. | ||
32 | -- | ||
33 | pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () | ||
34 | pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = | ||
35 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
36 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
37 | msg <- atomically $ readTBMQueue fromChan | ||
38 | case msg of | ||
39 | Just m' -> do | ||
40 | x <- triggerAction m' | ||
41 | case translate x m' of | ||
42 | Just m -> atomically $ writeTBMQueue toChan m | ||
43 | _ -> return () | ||
44 | _ -> return () | ||
45 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
46 | |||
47 | pipeTransHook fromChan toChan translate triggerAction = | ||
48 | pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction | ||
49 | |||
50 | pipeTrans fromChan toChan translate = | ||
51 | pipeTransHook fromChan toChan translate (void . return) | ||
52 | |||
53 | pipeHook fromChan toChan triggerAction = | ||
54 | pipeTransHook fromChan toChan id triggerAction | ||
55 | |||
56 | pipeQueue fromChan toChan = | ||
57 | pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) | ||
58 | |||
59 | teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = | ||
60 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
61 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
62 | msg <- atomically $ readTBMQueue fromChan | ||
63 | case msg of | ||
64 | Just m -> do | ||
65 | atomically $ writeTBMQueue toChan1 m | ||
66 | atomically $ writeTBMQueue toChan2 m | ||
67 | _ -> return () | ||
68 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
69 | |||
70 | teePipeQueue fromChan toChan1 toChan2 = | ||
71 | teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 | ||
72 | |||
73 | |||
74 | -- Deprecated: Use consumeQueueMicroseconds | ||
75 | -- TODO: Remove this | ||
76 | withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do | ||
77 | whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do | ||
78 | t <- atomically $ readTBMQueue fromChan | ||
79 | case t of | ||
80 | Just x -> action x | ||
81 | Nothing -> return () | ||
82 | threadDelay delay | ||
83 | |||
84 | {-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} | ||
85 | withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action | ||
86 | {-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} | ||
87 | |||
88 | -- | consumeQueueMicroseconds | ||
89 | -- (as of version 1.0.4) | ||
90 | -- | ||
91 | -- Continously run the provided action on items | ||
92 | -- from the provided queue. Delay for provided | ||
93 | -- microseconds each time the queue is emptied. | ||
94 | consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do | ||
95 | whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do | ||
96 | x <- atomically $ readTBMQueue q | ||
97 | case x of | ||
98 | Just s -> action s | ||
99 | Nothing -> return () | ||
100 | threadDelay micros | ||