summaryrefslogtreecommitdiff
path: root/kikid.hs
blob: 04e06d3eb29bec02753553dd845d317903fff6f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
{-# LANGUAGE OverloadedStrings #-}

import System.Posix.Daemonize
import Control.Concurrent
import System.Posix.Syslog
import System.Posix.Signals
import System.Posix.User (getEffectiveUserID)
import KikiD.PortServer
import KikiD.Multiplex
import KikiD.Message
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.Monoid
import qualified Data.ByteString.Char8 as B
--import Data.Serialize
import qualified Data.Map as M
import qualified Data.Bytes.Serial as Bytes
import qualified Data.Bytes.Get as Bytes
--import qualified Data.Bytes.Put as Bytes

-- TODO: Set this in config file
port = 9800
max_conns = 100
qsize = 20
qdelay = 5*10^5

doNothing = return ()

main = do
    me <- getEffectiveUserID
    if me == 0 then     
     serviced simpleDaemon { privilegedAction = startupAsRoot
                           , program = kikidMain
                           , user = Just "root", group = Just "root" }
     else putStrLn "Kiki Daemon must be run as root."

startupAsRoot = syslog Notice "kikid Started."

kikidMain _ = do
    refreshFlag <- atomically $ newTVar True
    whileM_ (atomically $ readTVar refreshFlag) $ do
        incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
        newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
        currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
        let timeToQuit = atomically . isClosedTBMQueue $ newchans
        installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
                                          atomically $ writeTVar refreshFlag False
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        installHandler sigHUP (Catch (do  syslog Notice ("Caught sigHUP: Refreshing..")
                                          atomically $ closeTBMQueue newchans
                                          atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
        (_,ex) <- join . fmap waitAnyCatch $ mapM async 
                        [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
                        , addOpenConnections newchans currentClients
                        , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
                        , purgeClosedConnections timeToQuit currentClients
                        ]
        case (ex::Either SomeException ()) of
            Left e -> syslog Notice ("Exception: " <> show e)
            Right _ -> doNothing

        atomically $ closeTBMQueue newchans
        atomically $ closeTBMQueue incomming

data ClientState = CState {csQueue :: TBMQueue KikiDMessage}
type ClientID = ThreadId 
threadIdToClient = id

addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
    cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
    whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
        x <- atomically $ readTBMQueue newchans
        case x of
            Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
                syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
                atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
            Nothing -> doNothing
    threadDelay qdelay

purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
    map <- atomically $ readTVar currentClients
    let k = M.keys map
        e = M.elems map
    closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
    let f False a b = [(a,b)] -- still open
        f True _ _  = []       -- closed
        closing = filter fst (zip closedClients k)
    if (not . null $ closing)
        then syslog Notice ("Closing connections: " ++ show closing)
        else doNothing
    atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
    threadDelay qdelay

handleMessage hdl outq = do 
    line <- B.hGetLine hdl
    tid <- myThreadId
    case (Bytes.runGetS Bytes.deserialize (B.snoc line '\n')   :: Either String KikiDMessage) of
        Right msg -> do
            syslog Notice ("Message decoded on thread=" <> show tid)
            syslog Notice ("Message: " <> show msg)
        Left str -> do
            syslog Notice ("ERROR: " <> show line)
            syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
            syslog Notice ("ERROR: " ++ str)

consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
-- TODO: Do more here...