1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
{-# LANGUAGE OverloadedStrings #-}
import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)
import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
import Data.Serialize
import qualified Data.Map as M
-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5
doNothing = return ()
main = do
me <- getEffectiveUserID
if me == 0 then
serviced simpleDaemon { privilegedAction = startupAsRoot
, program = kikidMain
, user = Just "root", group = Just "root" }
else putStrLn "Kiki Daemon must be run as root."
startupAsRoot = syslog Notice "kikid Started."
kikidMain _ = do
refreshFlag <- atomically $ newTVar True
whileM_ (atomically $ readTVar refreshFlag) $ do
incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
let timeToQuit = atomically . isClosedTBMQueue $ newchans
installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
atomically $ writeTVar refreshFlag False
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
(_,ex) <- join . fmap waitAnyCatch $ mapM async
[ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
, addOpenConnections newchans currentClients
, consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
, purgeClosedConnections timeToQuit currentClients
]
case (ex::Either SomeException ()) of
Left e -> syslog Notice ("Exception: " <> show e)
Right _ -> doNothing
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming
data ClientState = CState {cs_queue :: TBMQueue KikiDMessage}
type ClientID = ThreadId
threadIdToClient = id
addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
x <- atomically $ readTBMQueue newchans
case x of
Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
Nothing -> doNothing
threadDelay qdelay
purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
map <- atomically $ readTVar currentClients
let k = M.keys map
e = M.elems map
closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
let f False a b = [(a,b)] -- still open
f True _ _ = [] -- closed
closing = filter fst (zip closedClients k)
if (not . null $ closing)
then syslog Notice ("Closing connections: " ++ show closing)
else doNothing
atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
threadDelay qdelay
handleMessage hdl outq = do
line <- B.hGetLine hdl
tid <- myThreadId
case (decode line :: Either String KikiDMessage) of
Right _ ->
syslog Notice ("Message decoded on thread=" <> show tid)
Left str -> do
syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
syslog Notice str
consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...
|