summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM/UpdateStream.hs
blob: a92168c01d93bad95f2221d920b4844de737752e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
---------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.STM.UpdateStream
--
--
-- Maintainer  :  joe@jerkface.net
-- Stability   :  experimental
--
-- An UpdateSream consists of pass-through messages that are queued up and 
-- slotted messages which might be obsoleted and discarded if they are not
-- consumed before newer slotted messages with the same slot value occur.
-- 
-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
-- recommended that you read that documentation first.
--
-- For example, the output
--
-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
--
-- is produced by the following code:
--
-- > import Control.Monad      (when, forever, (>=>))
-- > import Control.Monad.STM  (atomically)
-- > import Control.Concurrent (forkIO, threadDelay)
-- > import System.IO          (hFlush, stdout)
-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
-- > 
-- > messages :: [(Maybe String, Char)]
-- > messages = concat
-- >     [ slot "x" "(set x = 2)"
-- >     , message  "(Hello)"
-- >     , slot "y" "(set y = 23)"
-- >     , message  "(Joe)"
-- >     , slot "y" "(set y = 100)"
-- >     , message  "(was)"
-- >     , slot "x" "(set x = 5)"
-- >     , message  "(here)"
-- >     ]
-- >  where
-- >     slot v cs = map ((,) (Just v)) cs
-- >     message cs = map ((,) Nothing) cs
-- > 
-- > main = do
-- >     q <- atomically $ Cache.new (== '(') (==')')
-- >     let go = mapM_ (atomically . (uncurry $ Cache.push q)
-- >                      >=> const (threadDelay 10000))
-- >                    messages
-- >         slowly = do
-- >             while (atomically $ fmap not $ Cache.isEmpty q) $ do
-- >                 c <- atomically $ Cache.pull q
-- >                 putChar c
-- >             putStrLn ""
-- >             hFlush stdout
-- >                           where while pred body =
-- >                                  pred >>= flip when (body >> while pred body)
-- >         quickly = forkIO . forever $ do
-- >                     c <- atomically $ Cache.pull q
-- >                     putChar c
-- >                     hFlush stdout
-- >     putStr $ "Backlogged consumer: "
-- >     go >> slowly
-- >     putStr "Fast consumer: "
-- >     quickly >> go
-- >     putStrLn ""
--
module Control.Concurrent.STM.UpdateStream
    ( UpdateStream
    , new
    , push
    , pull
    , isStopper
    , isStarter
    , isEmpty
    ) where

import Control.Monad
import Control.Monad.STM
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.StatusCache (StatusCache)
import qualified Control.Concurrent.STM.StatusCache as Status
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe
import Data.Foldable (foldlM)

data UpdateStream slot x =
    UpdateStream { cache :: TVar (Map slot (StatusCache x))
                 , events :: TChan x
                 , inMessage :: TVar (Maybe slot)
                 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
                 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
                 }

-- | @new isStart isStop@
--
-- The @isStart@ and @isStop@ predicates indicate when an element
-- begins or ends a message.
new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
new isStart isStop = do
    cache <- newTVar Map.empty
    events <- newTChan
    inMessage <- newTVar Nothing
    return UpdateStream { cache = cache
                        , events = events
                        , inMessage = inMessage
                        , isStarter = isStart
                        , isStopper = isStop
                        }

-- | Enqueue a chunk into the 'UpdateStream'
--
-- If a slot is provided, then the message may be obsoleted when a new message
-- starts with the same slot value.  Otherwise, the chunk is preserved.  Note
-- that the same slot value must be passed again for every chunk of the message
-- as it is not remembered.
push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
push u Nothing x = writeTChan (events u) x
push u (Just n) x = do
    smap <- readTVar (cache u)
    scache <-
        case Map.lookup n smap of
            Just scache -> return scache
            Nothing -> do
                scache <- Status.new (isStarter u) (isStopper u)
                modifyTVar (cache u) (Map.insert n scache)
                return scache

    Status.push scache x

-- | Pull off a chunk from the 'UpdateStream' for processing.
-- 
-- If chunks are not pulled off quickly, they may be obsoleted
-- and discarded when new messages are 'push'ed.
pull :: Ord slot => UpdateStream slot x -> STM x
pull u = do
    (inm, mbscache) <- do
        mn <- readTVar $ inMessage u
        map <- readTVar (cache u)
        return $ (isJust mn, mn >>= flip Map.lookup map)
    let action =
         case (inm,mbscache) of
            (True,Just scache) -> Status.pull scache
            (True,Nothing)     -> readTChan (events u)
            (False,_) -> do
                cs <- fmap (Map.toList) $ readTVar (cache u)
                cs <- filterM (return . not <=< Status.isEmpty . snd)
                              cs
                maybe (readTChan $ events u)
                      (\(n,scache) -> do
                            writeTVar (inMessage u) (Just n)
                            Status.pull scache)
                      (listToMaybe cs)
    x <- action
    when (isStopper u x) $ writeTVar (inMessage u) Nothing
    return x

-- | True when the 'UpdateStream' is completely exhuasted.
isEmpty :: UpdateStream slot x -> STM Bool
isEmpty q = do
    e <- isEmptyTChan (events q)
    qs <- readTVar (cache q)
    d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
    return $ e && d