1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DoAndIfThenElse #-}
import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
--import Data.Serialize
import qualified Data.Map as M
import qualified Data.Bytes.Serial as Bytes
import qualified Data.Bytes.Get as Bytes
import qualified Data.Bytes.Put as Bytes
import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import KikiD.ClientState
-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5
doNothing = return ()
main = do
me <- getEffectiveUserID
if me == 0 then
serviced simpleDaemon { privilegedAction = startupAsRoot
, program = kikidMain
, user = Just "root", group = Just "root" }
else putStrLn "Kiki Daemon must be run as root."
startupAsRoot = syslog Notice "kikid Started."
kikidMain _ = do
refreshFlag <- atomically $ newTVar True
whileM_ (atomically $ readTVar refreshFlag) $ do
incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
let timeToQuit = atomically . isClosedTBMQueue $ newchans
installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
atomically $ writeTVar refreshFlag False
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
(_,ex) <- join . fmap waitAnyCatch $ mapM async
[ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
, addOpenConnections newchans currentClients
, consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
, purgeClosedConnections timeToQuit currentClients
]
case (ex::Either SomeException ()) of
Left e -> syslog Notice ("Exception: " <> show e)
Right _ -> doNothing
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming
addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
x <- atomically $ readTBMQueue newchans
case x of
Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
Nothing -> doNothing
threadDelay qdelay
purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
map <- atomically $ readTVar currentClients
let k = M.keys map
e = M.elems map
closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
let f False a b = [(a,b)] -- still open
f True _ _ = [] -- closed
closing = filter fst (zip closedClients k)
if (not . null $ closing)
then syslog Notice ("Closing connections: " ++ show closing)
else doNothing
atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
threadDelay qdelay
handleMessage hdl outq = do
line <- B.hGetLine hdl
tid <- myThreadId
case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of
Right msg -> do
syslog Notice ("Message decoded on thread=" <> show tid)
syslog Notice ("Message: " <> show msg)
Left str -> do
syslog Notice ("ERROR: " <> show line)
syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
syslog Notice ("ERROR: " ++ str)
consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...
|