1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
{-# LANGUAGE OverloadedStrings #-}
import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)
import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
--import Data.Serialize
import qualified Data.Map as M
import qualified Data.Bytes.Serial as Bytes
import qualified Data.Bytes.Get as Bytes
--import qualified Data.Bytes.Put as Bytes
-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5
doNothing = return ()
main = do
me <- getEffectiveUserID
if me == 0 then
serviced simpleDaemon { privilegedAction = startupAsRoot
, program = kikidMain
, user = Just "root", group = Just "root" }
else putStrLn "Kiki Daemon must be run as root."
startupAsRoot = syslog Notice "kikid Started."
kikidMain _ = do
refreshFlag <- atomically $ newTVar True
whileM_ (atomically $ readTVar refreshFlag) $ do
incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
let timeToQuit = atomically . isClosedTBMQueue $ newchans
installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
atomically $ writeTVar refreshFlag False
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
(_,ex) <- join . fmap waitAnyCatch $ mapM async
[ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
, addOpenConnections newchans currentClients
, consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
, purgeClosedConnections timeToQuit currentClients
]
case (ex::Either SomeException ()) of
Left e -> syslog Notice ("Exception: " <> show e)
Right _ -> doNothing
atomically $ closeTBMQueue newchans
atomically $ closeTBMQueue incomming
data ClientState = CState {csQueue :: TBMQueue KikiDMessage}
type ClientID = ThreadId
threadIdToClient = id
addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
x <- atomically $ readTBMQueue newchans
case x of
Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
Nothing -> doNothing
threadDelay qdelay
purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
map <- atomically $ readTVar currentClients
let k = M.keys map
e = M.elems map
closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
let f False a b = [(a,b)] -- still open
f True _ _ = [] -- closed
closing = filter fst (zip closedClients k)
if (not . null $ closing)
then syslog Notice ("Closing connections: " ++ show closing)
else doNothing
atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
threadDelay qdelay
handleMessage hdl outq = do
line <- B.hGetLine hdl
tid <- myThreadId
case (Bytes.runGetS Bytes.deserialize (B.snoc line '\n') :: Either String KikiDMessage) of
Right msg -> do
syslog Notice ("Message decoded on thread=" <> show tid)
syslog Notice ("Message: " <> show msg)
Left str -> do
syslog Notice ("ERROR: " <> show line)
syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
syslog Notice ("ERROR: " ++ str)
consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...
|