{-# LANGUAGE OverloadedStrings #-} import System.Posix.Daemonize import Control.Concurrent import System.Posix.Syslog import System.Posix.Signals import System.Posix.User (getEffectiveUserID) import KikiD.PortServer import KikiD.Multiplex import KikiD.Message import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue import Control.Concurrent.Async import Control.Monad import Control.Monad.Loops import Control.Exception import Data.Monoid import qualified Data.ByteString.Char8 as B import Data.Serialize import qualified Data.Map as M -- TODO: Set this in config file port = 9800 max_conns = 100 qsize = 20 qdelay = 5*10^5 doNothing = return () main = do me <- getEffectiveUserID if me == 0 then serviced simpleDaemon { privilegedAction = startupAsRoot , program = kikidMain , user = Just "root", group = Just "root" } else putStrLn "Kiki Daemon must be run as root." startupAsRoot = syslog Notice "kikid Started." kikidMain _ = do refreshFlag <- atomically $ newTVar True whileM_ (atomically $ readTVar refreshFlag) $ do incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) let timeToQuit = atomically . isClosedTBMQueue $ newchans installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") atomically $ writeTVar refreshFlag False atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) (_,ex) <- join . fmap waitAnyCatch $ mapM async [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage , addOpenConnections newchans currentClients , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) , purgeClosedConnections timeToQuit currentClients ] case (ex::Either SomeException ()) of Left e -> syslog Notice ("Exception: " <> show e) Right _ -> doNothing atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming data ClientState = CState {cs_queue :: TBMQueue KikiDMessage} type ClientID = ThreadId threadIdToClient = id addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do x <- atomically $ readTBMQueue newchans case x of Just (newClientThread,clientQ) -> do -- Is clientQ input or output? syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) Nothing -> doNothing threadDelay qdelay purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do map <- atomically $ readTVar currentClients let k = M.keys map e = M.elems map closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e let f False a b = [(a,b)] -- still open f True _ _ = [] -- closed closing = filter fst (zip closedClients k) if (not . null $ closing) then syslog Notice ("Closing connections: " ++ show closing) else doNothing atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e threadDelay qdelay handleMessage hdl outq = do line <- B.hGetLine hdl tid <- myThreadId case (decode line :: Either String KikiDMessage) of Right _ -> syslog Notice ("Message decoded on thread=" <> show tid) Left str -> do syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) syslog Notice str consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) -- TODO: Do more here...