From d420aefe0b14081b00d4655f9e8317903b5b02c3 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sun, 21 Jun 2015 07:36:46 -0400 Subject: kikid: The Beginnings of the Kiki Daemon --- KikiD/GetLine.hs | 18 +++++++ KikiD/Message.hs | 18 +++++++ KikiD/Multiplex.hs | 100 ++++++++++++++++++++++++++++++++++++ KikiD/PortServer.hs | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++ kiki.cabal | 18 ++++++- kikid.hs | 107 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 405 insertions(+), 1 deletion(-) create mode 100644 KikiD/GetLine.hs create mode 100644 KikiD/Message.hs create mode 100644 KikiD/Multiplex.hs create mode 100644 KikiD/PortServer.hs create mode 100644 kikid.hs diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs new file mode 100644 index 0000000..8af5dc6 --- /dev/null +++ b/KikiD/GetLine.hs @@ -0,0 +1,18 @@ +module KikiD.GetLine where + +import Control.Monad +import Data.Serialize +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as L +import Data.Monoid +import Data.Binary.Builder + +getLine :: Get BS.ByteString +getLine = getWords empty + where + getWords b = do + w <- getWord8 + let x = singleton w + if (w == 10 || w == 0) + then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x + else getWords (b <> x) diff --git a/KikiD/Message.hs b/KikiD/Message.hs new file mode 100644 index 0000000..c4903cc --- /dev/null +++ b/KikiD/Message.hs @@ -0,0 +1,18 @@ +module KikiD.Message where + +import Data.Serialize +import qualified KikiD.GetLine +import qualified Data.ByteString.Char8 as B +import Data.Monoid +import Text.Read + +data KikiDMessage = TODO deriving (Show,Read) + +instance Serialize KikiDMessage where + put = putByteString . B.pack . show + get = do + x <- KikiD.GetLine.getLine + case (readEither (B.unpack x) :: Either String KikiDMessage) of + Right m -> return m + Left er -> fail ("PARSE ERROR: " <> er) + diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs new file mode 100644 index 0000000..4a31127 --- /dev/null +++ b/KikiD/Multiplex.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE BangPatterns #-} +module KikiD.Multiplex where + +import System.IO +import qualified Data.ByteString.Char8 as B +import Data.Monoid +import Control.Concurrent.STM +import Data.Map.Strict as M +import Control.Monad +import Control.Concurrent +import qualified Data.Binary as Bin +import Control.Concurrent.STM.TBMQueue +import Control.Monad.Loops +import Data.List +import Data.Maybe + +-- | pipeTransHookMicroseconds +-- +-- This function indefinitely reads the @fromChan@ queue and applies +-- the function @translate@ to the contents before passing it on to the +-- @toChan@ queue. The @triggerAction@ is performed on the message prior +-- to the translation. The @fromChan@ queue is checked every @micros@ +-- microseconds from the last emptying. +-- +-- To terminate the thread, close @fromChan@ queue. +-- +pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () +pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = + whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do + whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do + msg <- atomically $ readTBMQueue fromChan + case msg of + Just m' -> do + x <- triggerAction m' + case translate x m' of + Just m -> atomically $ writeTBMQueue toChan m + _ -> return () + _ -> return () + threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads + +pipeTransHook fromChan toChan translate triggerAction = + pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction + +pipeTrans fromChan toChan translate = + pipeTransHook fromChan toChan translate (void . return) + +pipeHook fromChan toChan triggerAction = + pipeTransHook fromChan toChan id triggerAction + +pipeQueue fromChan toChan = + pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) + +teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = + whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do + whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do + msg <- atomically $ readTBMQueue fromChan + case msg of + Just m -> do + atomically $ writeTBMQueue toChan1 m + atomically $ writeTBMQueue toChan2 m + _ -> return () + threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads + +teePipeQueue fromChan toChan1 toChan2 = + teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 + + +-- Deprecated: Use consumeQueueMicroseconds +-- TODO: Remove this +withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do + whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do + t <- atomically $ readTBMQueue fromChan + case t of + Just x -> action x + Nothing -> return () + threadDelay delay + +{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} +withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action +{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} + +-- | consumeQueueMicroseconds +-- (as of version 1.0.4) +-- +-- Continously run the provided action on items +-- from the provided queue. Delay for provided +-- microseconds each time the queue is emptied. +consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do + whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do + x <- atomically $ readTBMQueue q + case x of + Just s -> action s + Nothing -> return () + threadDelay micros diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs new file mode 100644 index 0000000..31101a7 --- /dev/null +++ b/KikiD/PortServer.hs @@ -0,0 +1,145 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE BangPatterns #-} +module KikiD.PortServer (createTCPPortListener) where + +import qualified Data.ByteString.Char8 as B +import Network.Socket hiding (send) +import Network.Socket.ByteString +import Data.Monoid ((<>)) +--import qualified Network.IRC as IRC +import Network.HTTP.Base (catchIO,catchIO_) +import Control.Concurrent.STM +import Control.Concurrent +import Control.Monad +import Control.Monad.Fix +import System.IO +import System.Directory (getAppUserDataDirectory) +import Text.Printf (printf) +import System.FilePath (()) +import Control.Concurrent.STM.TBMQueue +import Control.Monad.Loops +import KikiD.Multiplex (pipeTransHookMicroseconds) +import Control.Exception +import Control.Concurrent.Async +import Data.Serialize + +import Control.Arrow (second) +--import qualified Merv.GetLine as MG + +{-instance Serialize IRC.Message where + put = putByteString . IRC.encode + get = do + x <- MG.getLine + case IRC.decode x of + Just x -> return x + Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'") + + +createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int + -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO () +createIRCPortListener port name delay qsize maxconns postNewTChans outq = + createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact + +-} + +createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int + -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a + -> (Handle -> TBMQueue a -> IO ()) -> IO () +createTCPPortListener port name delay qsize maxconns postNewTChans outq react = + bracket + -- aquire resources + (socket AF_INET Stream 0) + + -- release resources + sClose + + -- operate on resources + (\sock -> do + -- make socket immediately reusable - eases debugging. + setSocketOption sock ReuseAddr 1 + -- listen on TCP port 4242 + bindSocket sock (SockAddrInet port iNADDR_ANY) + -- allow a maximum of 15 outstanding connections + listen sock maxconns + sockAcceptLoop sock name delay qsize postNewTChans outq react + ) + +sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a + -> (Handle -> TBMQueue a -> IO ()) -> IO () +sockAcceptLoop listenSock name delay qsize postNewTChans outq react = + whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do + -- accept one connection and handle it + conn@(sock,_) <- accept listenSock + async $ bracket (do + -- acquire resources + hdl <- socketToHandle sock ReadWriteMode + q <- atomically $ newTBMQueue qsize + thisChildOut <- atomically $ newTBMQueue qsize + async1 <- async (runConn hdl name q thisChildOut delay react) + async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 + (\() -> Just) -- no translation on outgoing + (\m -> return ())) + return (hdl,q,thisChildOut,(async1,async2)) + ) + -- release resources + (\(hdl,q,thisChildOut,(async1,async2)) -> do + cancel async1 + cancel async2 + atomically $ closeTBMQueue q + atomically $ closeTBMQueue thisChildOut + hClose hdl + ) + -- run opration on async + (\(_,q,_,(async1,async2)) -> do + let tid = asyncThreadId async1 + atomically $ writeTBMQueue postNewTChans (tid,q) + --link2 async1 async2 -- Do I need this? + waitBoth async1 async2 + ) + +runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int + -> (Handle -> TBMQueue a -> IO ()) -> IO () +runConn hdl name q outq delay react = do + --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) + -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) + -- OnConnect Message... + + race_ + -- continuously read q and output to handle (socket) + -- to terminate thread, close q + (do + let pending = fmap not (atomically $ isEmptyTBMQueue q) + closed = atomically . isClosedTBMQueue $ q + whileM_ (fmap not closed) $ do + whileM_ pending $ do + m <- atomically (readTBMQueue q) + case m of + Just m -> B.hPutStrLn hdl (encode m) + -- Nothing means the Queue is closed and empty, so dont loop + Nothing -> return () + threadDelay delay + --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) + ) + + -- continuously input from handle and + -- send to provided outq + (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) + + +{- +ircReact hdl outq = do + line <- B.hGetLine hdl + -- debugging + dir <- getAppUserDataDirectory "merv" + tid <- myThreadId + let bQuit = (B.isPrefixOf "/quit") line + appendFile (dir "xdebug") + (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line)) + -- end debugging + case IRC.decode line of + Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq + Just m -> atomically $ writeTBMQueue outq m + Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq + _ -> return undefined +-} diff --git a/kiki.cabal b/kiki.cabal index f811743..2632e6d 100644 --- a/kiki.cabal +++ b/kiki.cabal @@ -13,7 +13,9 @@ build-type: Simple Executable kiki Main-is: kiki.hs - Build-Depends: base -any, directory -any, + -- base >=4.6 due to use of readEither in KikiD.Message + Build-Depends: base >=4.6.0.0, + directory -any, openpgp-util -any, crypto-pubkey (>=0.2.3), cryptohash -any, crypto-pubkey-types -any, @@ -31,5 +33,19 @@ Executable hosts Main-is: hosts.hs c-sources: dotlock.c +Executable kikid + Main-is: kikid.hs + Build-Depends: base -any, + --kiki >=0.0.3, + hdaemonize >= 0.5, + hsyslog -any, + async >= 2.0.0, + stm-chans >= 2.0.0, + network >= 2.4 && < 3.0, + monad-loops -any, + HTTP -any, + stm >= 2.3, + cereal -any + library exposed-modules: KeyRing diff --git a/kikid.hs b/kikid.hs new file mode 100644 index 0000000..31426a3 --- /dev/null +++ b/kikid.hs @@ -0,0 +1,107 @@ +{-# LANGUAGE OverloadedStrings #-} + +import System.Posix.Daemonize +import Control.Concurrent +import System.Posix.Syslog +import System.Posix.Signals +import System.Posix.User (getEffectiveUserID) +import KikiD.PortServer +import KikiD.Multiplex +import KikiD.Message +import Control.Concurrent.STM +import Control.Concurrent.STM.TBMQueue +import Control.Concurrent.Async +import Control.Monad +import Control.Monad.Loops +import Control.Exception +import Data.Monoid +import qualified Data.ByteString.Char8 as B +import Data.Serialize +import qualified Data.Map as M + +-- TODO: Set this in config file +port = 9800 +max_conns = 100 +qsize = 20 +qdelay = 5*10^5 + +doNothing = return () + +main = do + me <- getEffectiveUserID + if me == 0 then + serviced simpleDaemon { privilegedAction = startupAsRoot + , program = kikidMain + , user = Just "root", group = Just "root" } + else putStrLn "Kiki Daemon must be run as root." + +startupAsRoot = syslog Notice "kikid Started." + +kikidMain _ = do + refreshFlag <- atomically $ newTVar True + whileM_ (atomically $ readTVar refreshFlag) $ do + incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) + newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) + currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) + let timeToQuit = atomically . isClosedTBMQueue $ newchans + installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") + atomically $ writeTVar refreshFlag False + atomically $ closeTBMQueue newchans + atomically $ closeTBMQueue incomming)) (Just fullSignalSet) + installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") + atomically $ closeTBMQueue newchans + atomically $ closeTBMQueue incomming)) (Just fullSignalSet) + (_,ex) <- join . fmap waitAnyCatch $ mapM async + [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage + , addOpenConnections newchans currentClients + , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) + , purgeClosedConnections timeToQuit currentClients + ] + case (ex::Either SomeException ()) of + Left e -> syslog Notice ("Exception: " <> show e) + Right _ -> doNothing + + atomically $ closeTBMQueue newchans + atomically $ closeTBMQueue incomming + +data ClientState = CState {cs_queue :: TBMQueue KikiDMessage} +type ClientID = ThreadId +threadIdToClient = id + +addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do + cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) + whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do + x <- atomically $ readTBMQueue newchans + case x of + Just (newClientThread,clientQ) -> do -- Is clientQ input or output? + syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") + atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) + Nothing -> doNothing + threadDelay qdelay + +purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do + map <- atomically $ readTVar currentClients + let k = M.keys map + e = M.elems map + closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e + let f False a b = [(a,b)] -- still open + f True _ _ = [] -- closed + closing = filter fst (zip closedClients k) + if (not . null $ closing) + then syslog Notice ("Closing connections: " ++ show closing) + else doNothing + atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e + threadDelay qdelay + +handleMessage hdl outq = do + line <- B.hGetLine hdl + tid <- myThreadId + case (decode line :: Either String KikiDMessage) of + Right _ -> + syslog Notice ("Message decoded on thread=" <> show tid) + Left str -> do + syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) + syslog Notice str + +consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) +-- TODO: Do more here... -- cgit v1.2.3