{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE DoAndIfThenElse #-} import System.Posix.Daemonize import Control.Concurrent import System.Posix.Syslog import System.Posix.Signals import System.Posix.User (getEffectiveUserID) import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue import Control.Concurrent.Async import Control.Monad import Control.Monad.Loops import Control.Exception import Data.Monoid import qualified Data.ByteString.Char8 as B --import Data.Serialize import qualified Data.Map as M import qualified Data.Bytes.Serial as Bytes import qualified Data.Bytes.Get as Bytes import qualified Data.Bytes.Put as Bytes import KikiD.PortServer import KikiD.Multiplex import KikiD.Message import KikiD.ClientState -- TODO: Set this in config file port = 9800 max_conns = 100 qsize = 20 qdelay = 5*10^5 doNothing = return () main = do me <- getEffectiveUserID if me == 0 then serviced simpleDaemon { privilegedAction = startupAsRoot , program = kikidMain , user = Just "root", group = Just "root" } else putStrLn "Kiki Daemon must be run as root." startupAsRoot = syslog Notice "kikid Started." kikidMain _ = do refreshFlag <- atomically $ newTVar True whileM_ (atomically $ readTVar refreshFlag) $ do incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) let timeToQuit = atomically . isClosedTBMQueue $ newchans installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") atomically $ writeTVar refreshFlag False atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) (_,ex) <- join . fmap waitAnyCatch $ mapM async [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage , addOpenConnections newchans currentClients , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) , purgeClosedConnections timeToQuit currentClients ] case (ex::Either SomeException ()) of Left e -> syslog Notice ("Exception: " <> show e) Right _ -> doNothing atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do x <- atomically $ readTBMQueue newchans case x of Just (newClientThread,clientQ) -> do -- Is clientQ input or output? syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) Nothing -> doNothing threadDelay qdelay purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do map <- atomically $ readTVar currentClients let k = M.keys map e = M.elems map closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e let f False a b = [(a,b)] -- still open f True _ _ = [] -- closed closing = filter fst (zip closedClients k) if (not . null $ closing) then syslog Notice ("Closing connections: " ++ show closing) else doNothing atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e threadDelay qdelay handleMessage hdl outq = do line <- B.hGetLine hdl tid <- myThreadId case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of Right msg -> do syslog Notice ("Message decoded on thread=" <> show tid) syslog Notice ("Message: " <> show msg) Left str -> do syslog Notice ("ERROR: " <> show line) syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) syslog Notice ("ERROR: " ++ str) consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) -- TODO: Do more here...