{-# LANGUAGE OverloadedStrings #-} import System.Posix.Daemonize import Control.Concurrent import System.Posix.Syslog import System.Posix.Signals import System.Posix.User (getEffectiveUserID) import KikiD.PortServer import KikiD.Multiplex import KikiD.Message import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue import Control.Concurrent.Async import Control.Monad import Control.Monad.Loops import Control.Exception import Data.Monoid import qualified Data.ByteString.Char8 as B --import Data.Serialize import qualified Data.Map as M import qualified Data.Bytes.Serial as Bytes import qualified Data.Bytes.Get as Bytes --import qualified Data.Bytes.Put as Bytes -- TODO: Set this in config file port = 9800 max_conns = 100 qsize = 20 qdelay = 5*10^5 doNothing = return () main = do me <- getEffectiveUserID if me == 0 then serviced simpleDaemon { privilegedAction = startupAsRoot , program = kikidMain , user = Just "root", group = Just "root" } else putStrLn "Kiki Daemon must be run as root." startupAsRoot = syslog Notice "kikid Started." kikidMain _ = do refreshFlag <- atomically $ newTVar True whileM_ (atomically $ readTVar refreshFlag) $ do incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) let timeToQuit = atomically . isClosedTBMQueue $ newchans installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") atomically $ writeTVar refreshFlag False atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming)) (Just fullSignalSet) (_,ex) <- join . fmap waitAnyCatch $ mapM async [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage , addOpenConnections newchans currentClients , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) , purgeClosedConnections timeToQuit currentClients ] case (ex::Either SomeException ()) of Left e -> syslog Notice ("Exception: " <> show e) Right _ -> doNothing atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming data ClientState = CState {csQueue :: TBMQueue KikiDMessage} type ClientID = ThreadId threadIdToClient = id addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do x <- atomically $ readTBMQueue newchans case x of Just (newClientThread,clientQ) -> do -- Is clientQ input or output? syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) Nothing -> doNothing threadDelay qdelay purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do map <- atomically $ readTVar currentClients let k = M.keys map e = M.elems map closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e let f False a b = [(a,b)] -- still open f True _ _ = [] -- closed closing = filter fst (zip closedClients k) if (not . null $ closing) then syslog Notice ("Closing connections: " ++ show closing) else doNothing atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e threadDelay qdelay handleMessage hdl outq = do line <- B.hGetLine hdl tid <- myThreadId case (Bytes.runGetS Bytes.deserialize (B.snoc line '\n') :: Either String KikiDMessage) of Right msg -> do syslog Notice ("Message decoded on thread=" <> show tid) syslog Notice ("Message: " <> show msg) Left str -> do syslog Notice ("ERROR: " <> show line) syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) syslog Notice ("ERROR: " ++ str) consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) -- TODO: Do more here...