summaryrefslogtreecommitdiff
path: root/kikid.hs
blob: 059a5df01849f7920486988fa93888cfc2899913 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DoAndIfThenElse #-}

import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)

import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
--import Data.Serialize
import qualified Data.Map as M
import qualified Data.Bytes.Serial as Bytes
import qualified Data.Bytes.Get as Bytes
import qualified Data.Bytes.Put as Bytes

import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import KikiD.ClientState

-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5

doNothing = return ()

main = do
    me <- getEffectiveUserID
    if me == 0 then     
     serviced simpleDaemon { privilegedAction = startupAsRoot
                           , program = kikidMain
                           , user = Just "root", group = Just "root" }
     else putStrLn "Kiki Daemon must be run as root."

startupAsRoot = syslog Notice "kikid Started."

kikidMain _ = do
    refreshFlag <- atomically $ newTVar True
    whileM_ (atomically $ readTVar refreshFlag) $ do
        incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
        newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
        currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
        let timeToQuit = atomically . isClosedTBMQueue $ newchans
        installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
                                          atomically $ writeTVar refreshFlag False
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        installHandler sigHUP (Catch (do  syslog Notice ("Caught sigHUP: Refreshing..")
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        (_,ex) <- join . fmap waitAnyCatch $ mapM async 
                        [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
                        , addOpenConnections newchans currentClients
                        , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
                        , purgeClosedConnections timeToQuit currentClients
                        ]
        case (ex::Either SomeException ()) of
            Left e -> syslog Notice ("Exception: " <> show e)
            Right _ -> doNothing

        atomically $ closeTBMQueue newchans
        atomically $ closeTBMQueue incomming

addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
    cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
    whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
        x <- atomically $ readTBMQueue newchans
        case x of
            Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
                syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
                atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
            Nothing -> doNothing
    threadDelay qdelay

purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
    map <- atomically $ readTVar currentClients
    let k = M.keys map
        e = M.elems map
    closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
    let f False a b = [(a,b)] -- still open
        f True _ _  = []       -- closed
        closing = filter fst (zip closedClients k)
    if (not . null $ closing)
        then syslog Notice ("Closing connections: " ++ show closing)
        else doNothing
    atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
    threadDelay qdelay

handleMessage hdl outq = do 
    line <- B.hGetLine hdl
    tid <- myThreadId
    case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of
        Right msg -> do
            syslog Notice ("Message decoded on thread=" <> show tid)
            syslog Notice ("Message: " <> show msg)
        Left str -> do
            syslog Notice ("ERROR: " <> show line)
            syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
            syslog Notice ("ERROR: " ++ str)

consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...