summaryrefslogtreecommitdiff
path: root/KikiD/Multiplex.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2015-06-21 07:36:46 -0400
committerJames Crayne <jim.crayne@gmail.com>2015-06-21 15:35:26 -0400
commitd420aefe0b14081b00d4655f9e8317903b5b02c3 (patch)
tree7b58c8a784d1ef23be05e62aaf0c8311353d57d5 /KikiD/Multiplex.hs
parent75d4b3b8bc60ff1d7ccc3990833a4439c0af02bc (diff)
kikid: The Beginnings of the Kiki Daemon
Diffstat (limited to 'KikiD/Multiplex.hs')
-rw-r--r--KikiD/Multiplex.hs100
1 files changed, 100 insertions, 0 deletions
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs
new file mode 100644
index 0000000..4a31127
--- /dev/null
+++ b/KikiD/Multiplex.hs
@@ -0,0 +1,100 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE DeriveGeneric #-}
7{-# LANGUAGE BangPatterns #-}
8module KikiD.Multiplex where
9
10import System.IO
11import qualified Data.ByteString.Char8 as B
12import Data.Monoid
13import Control.Concurrent.STM
14import Data.Map.Strict as M
15import Control.Monad
16import Control.Concurrent
17import qualified Data.Binary as Bin
18import Control.Concurrent.STM.TBMQueue
19import Control.Monad.Loops
20import Data.List
21import Data.Maybe
22
23-- | pipeTransHookMicroseconds
24--
25-- This function indefinitely reads the @fromChan@ queue and applies
26-- the function @translate@ to the contents before passing it on to the
27-- @toChan@ queue. The @triggerAction@ is performed on the message prior
28-- to the translation. The @fromChan@ queue is checked every @micros@
29-- microseconds from the last emptying.
30--
31-- To terminate the thread, close @fromChan@ queue.
32--
33pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO ()
34pipeTransHookMicroseconds fromChan toChan micros translate triggerAction =
35 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
36 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
37 msg <- atomically $ readTBMQueue fromChan
38 case msg of
39 Just m' -> do
40 x <- triggerAction m'
41 case translate x m' of
42 Just m -> atomically $ writeTBMQueue toChan m
43 _ -> return ()
44 _ -> return ()
45 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
46
47pipeTransHook fromChan toChan translate triggerAction =
48 pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction
49
50pipeTrans fromChan toChan translate =
51 pipeTransHook fromChan toChan translate (void . return)
52
53pipeHook fromChan toChan triggerAction =
54 pipeTransHook fromChan toChan id triggerAction
55
56pipeQueue fromChan toChan =
57 pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return)
58
59teePipeQueueMicroseconds fromChan toChan1 toChan2 micros =
60 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
61 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
62 msg <- atomically $ readTBMQueue fromChan
63 case msg of
64 Just m -> do
65 atomically $ writeTBMQueue toChan1 m
66 atomically $ writeTBMQueue toChan2 m
67 _ -> return ()
68 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
69
70teePipeQueue fromChan toChan1 toChan2 =
71 teePipeQueueMicroseconds fromChan toChan1 toChan2 5000
72
73
74-- Deprecated: Use consumeQueueMicroseconds
75-- TODO: Remove this
76withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do
77 whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do
78 t <- atomically $ readTBMQueue fromChan
79 case t of
80 Just x -> action x
81 Nothing -> return ()
82 threadDelay delay
83
84{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-}
85withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action
86{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-}
87
88-- | consumeQueueMicroseconds
89-- (as of version 1.0.4)
90--
91-- Continously run the provided action on items
92-- from the provided queue. Delay for provided
93-- microseconds each time the queue is emptied.
94consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do
95 whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do
96 x <- atomically $ readTBMQueue q
97 case x of
98 Just s -> action s
99 Nothing -> return ()
100 threadDelay micros