summaryrefslogtreecommitdiff
path: root/kikid.hs
blob: 31426a325f8f33532e1a938487a416fbf87f64c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
{-# LANGUAGE OverloadedStrings #-}

import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)
import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
import Data.Serialize
import qualified Data.Map as M

-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5

doNothing = return ()

main = do
    me <- getEffectiveUserID
    if me == 0 then     
     serviced simpleDaemon { privilegedAction = startupAsRoot
                           , program = kikidMain
                           , user = Just "root", group = Just "root" }
     else putStrLn "Kiki Daemon must be run as root."

startupAsRoot = syslog Notice "kikid Started."

kikidMain _ = do
    refreshFlag <- atomically $ newTVar True
    whileM_ (atomically $ readTVar refreshFlag) $ do
        incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
        newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
        currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
        let timeToQuit = atomically . isClosedTBMQueue $ newchans
        installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
                                          atomically $ writeTVar refreshFlag False
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        installHandler sigHUP (Catch (do  syslog Notice ("Caught sigHUP: Refreshing..")
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        (_,ex) <- join . fmap waitAnyCatch $ mapM async 
                        [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
                        , addOpenConnections newchans currentClients
                        , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
                        , purgeClosedConnections timeToQuit currentClients
                        ]
        case (ex::Either SomeException ()) of
            Left e -> syslog Notice ("Exception: " <> show e)
            Right _ -> doNothing

        atomically $ closeTBMQueue newchans
        atomically $ closeTBMQueue incomming

data ClientState = CState {cs_queue :: TBMQueue KikiDMessage}
type ClientID = ThreadId 
threadIdToClient = id

addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
    cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
    whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
        x <- atomically $ readTBMQueue newchans
        case x of
            Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
                syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
                atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
            Nothing -> doNothing
    threadDelay qdelay

purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
    map <- atomically $ readTVar currentClients
    let k = M.keys map
        e = M.elems map
    closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
    let f False a b = [(a,b)] -- still open
        f True _ _  = []       -- closed
        closing = filter fst (zip closedClients k)
    if (not . null $ closing)
        then syslog Notice ("Closing connections: " ++ show closing)
        else doNothing
    atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
    threadDelay qdelay

handleMessage hdl outq = do 
    line <- B.hGetLine hdl
    tid <- myThreadId
    case (decode line :: Either String KikiDMessage) of
        Right _ -> 
            syslog Notice ("Message decoded on thread=" <> show tid)
        Left str -> do
            syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
            syslog Notice str

consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...