From 4ec9cc5e6e1c71184c0537fb2fbd4387f27b3ac2 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Thu, 28 Apr 2016 17:25:01 -0400 Subject: remove kikid, moved to separate repo --- Codec/LineReady.hs | 23 ----------- KikiD/ClientState.hs | 14 ------- KikiD/Message.hs | 42 ------------------- KikiD/Multiplex.hs | 100 -------------------------------------------- KikiD/PortServer.hs | 114 --------------------------------------------------- kikid.hs | 112 -------------------------------------------------- 6 files changed, 405 deletions(-) delete mode 100644 Codec/LineReady.hs delete mode 100644 KikiD/ClientState.hs delete mode 100644 KikiD/Message.hs delete mode 100644 KikiD/Multiplex.hs delete mode 100644 KikiD/PortServer.hs delete mode 100644 kikid.hs diff --git a/Codec/LineReady.hs b/Codec/LineReady.hs deleted file mode 100644 index a6961ca..0000000 --- a/Codec/LineReady.hs +++ /dev/null @@ -1,23 +0,0 @@ -module Codec.LineReady where - -import qualified Data.ByteString.Char8 as B -import Data.Monoid -import Data.List (foldl') -import Data.Maybe - -toLineReady :: B.ByteString -> B.ByteString -toLineReady blob = - let as = zip [0..] (B.unpack blob) - bs = filter ((=='\n') . snd) as - is = map fst bs - in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is - -replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString -replaceCharStrIndex c str i = a <> B.singleton c <> B.drop 1 b - where (a,b) = B.splitAt i str - -fromLineReady :: B.ByteString -> B.ByteString -fromLineReady str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is - where is = map fst . mapMaybe B.readInt $ - B.groupBy (\c d -> (c/=',')&&(d/=',')) ls - (ls,str') = B.break (==']') (B.drop 1 str) diff --git a/KikiD/ClientState.hs b/KikiD/ClientState.hs deleted file mode 100644 index a80a392..0000000 --- a/KikiD/ClientState.hs +++ /dev/null @@ -1,14 +0,0 @@ -module KikiD.ClientState where - -import KikiD.Message -import Control.Concurrent.STM.TBMQueue -import Control.Concurrent - -data ClientState = CState {cliQueue :: TBMQueue KikiDMessage} - -mkClient = CState - { cliQueue = error "ERROR CState: cliQueue parameter is required" - } - -type ClientID = ThreadId -threadIdToClient = id diff --git a/KikiD/Message.hs b/KikiD/Message.hs deleted file mode 100644 index efefdc6..0000000 --- a/KikiD/Message.hs +++ /dev/null @@ -1,42 +0,0 @@ -{-# LANGUAGE DoAndIfThenElse #-} -module KikiD.Message where - -import Data.Serialize as Cereal -import qualified Data.ByteString.Char8 as B -import Data.Monoid -import Text.Read -import Data.Char (ord,chr) -import Control.Monad -import Data.Bytes.Serial as R -import Data.Bytes.Put as Put -import Data.Bytes.Get as Get -import Codec.LineReady -import Control.Monad.Loops -import Data.Word - -data KikiDMessage = TODO deriving (Show,Read) - -instance Serialize KikiDMessage where - put m = mapM_ (Cereal.putWord8 . fromIntegral . ord) "TO\nO" - -- putByteString . B.pack $ show m ++ "\n" - get = do - t <- Cereal.getWord8 - o <- Cereal.getWord8 - d <- Cereal.getWord8 - o <- Cereal.getWord8 - let s = map (chr . fromIntegral) [t,o,d,o] - if "TO\nO" == s - then return TODO - else fail ("Could not decode message: " ++ show s) - -instance Serial KikiDMessage where - serialize m = Put.putByteString . toLineReady . Cereal.encode $ m - deserialize = do - xs <- unfoldM $ do - flag <- Get.isEmpty - if flag then return Nothing else do - c <- fmap (chr . fromIntegral) Get.getWord8 - if (c == '\n') then return Nothing else return (Just c) - case (Cereal.decode . fromLineReady $ B.pack xs) of - Left str -> fail str - Right x -> return x diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs deleted file mode 100644 index 4a31127..0000000 --- a/KikiD/Multiplex.hs +++ /dev/null @@ -1,100 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE BangPatterns #-} -module KikiD.Multiplex where - -import System.IO -import qualified Data.ByteString.Char8 as B -import Data.Monoid -import Control.Concurrent.STM -import Data.Map.Strict as M -import Control.Monad -import Control.Concurrent -import qualified Data.Binary as Bin -import Control.Concurrent.STM.TBMQueue -import Control.Monad.Loops -import Data.List -import Data.Maybe - --- | pipeTransHookMicroseconds --- --- This function indefinitely reads the @fromChan@ queue and applies --- the function @translate@ to the contents before passing it on to the --- @toChan@ queue. The @triggerAction@ is performed on the message prior --- to the translation. The @fromChan@ queue is checked every @micros@ --- microseconds from the last emptying. --- --- To terminate the thread, close @fromChan@ queue. --- -pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () -pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = - whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do - whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do - msg <- atomically $ readTBMQueue fromChan - case msg of - Just m' -> do - x <- triggerAction m' - case translate x m' of - Just m -> atomically $ writeTBMQueue toChan m - _ -> return () - _ -> return () - threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads - -pipeTransHook fromChan toChan translate triggerAction = - pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction - -pipeTrans fromChan toChan translate = - pipeTransHook fromChan toChan translate (void . return) - -pipeHook fromChan toChan triggerAction = - pipeTransHook fromChan toChan id triggerAction - -pipeQueue fromChan toChan = - pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) - -teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = - whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do - whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do - msg <- atomically $ readTBMQueue fromChan - case msg of - Just m -> do - atomically $ writeTBMQueue toChan1 m - atomically $ writeTBMQueue toChan2 m - _ -> return () - threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads - -teePipeQueue fromChan toChan1 toChan2 = - teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 - - --- Deprecated: Use consumeQueueMicroseconds --- TODO: Remove this -withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do - whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do - t <- atomically $ readTBMQueue fromChan - case t of - Just x -> action x - Nothing -> return () - threadDelay delay - -{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} -withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action -{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} - --- | consumeQueueMicroseconds --- (as of version 1.0.4) --- --- Continously run the provided action on items --- from the provided queue. Delay for provided --- microseconds each time the queue is emptied. -consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do - whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do - x <- atomically $ readTBMQueue q - case x of - Just s -> action s - Nothing -> return () - threadDelay micros diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs deleted file mode 100644 index b42e340..0000000 --- a/KikiD/PortServer.hs +++ /dev/null @@ -1,114 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE BangPatterns #-} -module KikiD.PortServer (createTCPPortListener) where - -import qualified Data.ByteString.Char8 as B -import Network.Socket hiding (send) -import Network.Socket.ByteString -import Data.Monoid ((<>)) ---import qualified Network.IRC as IRC -import Network.HTTP.Base (catchIO,catchIO_) -import Control.Concurrent.STM -import Control.Concurrent -import Control.Monad -import Control.Monad.Fix -import System.IO -import System.Directory (getAppUserDataDirectory) -import Text.Printf (printf) -import System.FilePath (()) -import Control.Concurrent.STM.TBMQueue -import Control.Monad.Loops -import KikiD.Multiplex (pipeTransHookMicroseconds) -import Control.Exception -import Control.Concurrent.Async -import Data.Bytes.Serial as R -import Data.Bytes.Put as Put - -import Control.Arrow (second) - - -createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int - -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a - -> (Handle -> TBMQueue a -> IO ()) -> IO () -createTCPPortListener port name delay qsize maxconns postNewTChans outq react = - bracket - -- aquire resources - (socket AF_INET Stream 0) - - -- release resources - sClose - - -- operate on resources - (\sock -> do - -- make socket immediately reusable - eases debugging. - setSocketOption sock ReuseAddr 1 - -- listen on TCP port 4242 - bindSocket sock (SockAddrInet port iNADDR_ANY) - -- allow a maximum of 15 outstanding connections - listen sock maxconns - sockAcceptLoop sock name delay qsize postNewTChans outq react - ) - -sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a - -> (Handle -> TBMQueue a -> IO ()) -> IO () -sockAcceptLoop listenSock name delay qsize postNewTChans outq react = - whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do - -- accept one connection and handle it - conn@(sock,_) <- accept listenSock - async $ bracket (do - -- acquire resources - hdl <- socketToHandle sock ReadWriteMode - q <- atomically $ newTBMQueue qsize - thisChildOut <- atomically $ newTBMQueue qsize - async1 <- async (runConn hdl name q thisChildOut delay react) - async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 - (\() -> Just) -- no translation on outgoing - (\m -> return ())) - return (hdl,q,thisChildOut,(async1,async2)) - ) - -- release resources - (\(hdl,q,thisChildOut,(async1,async2)) -> do - cancel async1 - cancel async2 - atomically $ closeTBMQueue q - atomically $ closeTBMQueue thisChildOut - hClose hdl - ) - -- run opration on async - (\(_,q,_,(async1,async2)) -> do - let tid = asyncThreadId async1 - atomically $ writeTBMQueue postNewTChans (tid,q) - --link2 async1 async2 -- Do I need this? - waitBoth async1 async2 - ) - -runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int - -> (Handle -> TBMQueue a -> IO ()) -> IO () -runConn hdl name q outq delay react = do - --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) - -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) - -- OnConnect Message... - - race_ - -- continuously read q and output to handle (socket) - -- to terminate thread, close q - (do - let pending = fmap not (atomically $ isEmptyTBMQueue q) - closed = atomically . isClosedTBMQueue $ q - whileM_ (fmap not closed) $ do - whileM_ pending $ do - m <- atomically (readTBMQueue q) - case m of - Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m) - -- Nothing means the Queue is closed and empty, so dont loop - Nothing -> return () - threadDelay delay - --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) - ) - - -- continuously input from handle and - -- send to provided outq - (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) - - diff --git a/kikid.hs b/kikid.hs deleted file mode 100644 index 059a5df..0000000 --- a/kikid.hs +++ /dev/null @@ -1,112 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE DoAndIfThenElse #-} - -import System.Posix.Daemonize -import Control.Concurrent -import System.Posix.Syslog -import System.Posix.Signals -import System.Posix.User (getEffectiveUserID) - -import Control.Concurrent.STM -import Control.Concurrent.STM.TBMQueue -import Control.Concurrent.Async -import Control.Monad -import Control.Monad.Loops -import Control.Exception -import Data.Monoid -import qualified Data.ByteString.Char8 as B ---import Data.Serialize -import qualified Data.Map as M -import qualified Data.Bytes.Serial as Bytes -import qualified Data.Bytes.Get as Bytes -import qualified Data.Bytes.Put as Bytes - -import KikiD.PortServer -import KikiD.Multiplex -import KikiD.Message -import KikiD.ClientState - --- TODO: Set this in config file -port = 9800 -max_conns = 100 -qsize = 20 -qdelay = 5*10^5 - -doNothing = return () - -main = do - me <- getEffectiveUserID - if me == 0 then - serviced simpleDaemon { privilegedAction = startupAsRoot - , program = kikidMain - , user = Just "root", group = Just "root" } - else putStrLn "Kiki Daemon must be run as root." - -startupAsRoot = syslog Notice "kikid Started." - -kikidMain _ = do - refreshFlag <- atomically $ newTVar True - whileM_ (atomically $ readTVar refreshFlag) $ do - incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) - newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) - currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) - let timeToQuit = atomically . isClosedTBMQueue $ newchans - installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") - atomically $ writeTVar refreshFlag False - atomically $ closeTBMQueue newchans - atomically $ closeTBMQueue incomming)) (Just fullSignalSet) - installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") - atomically $ closeTBMQueue newchans - atomically $ closeTBMQueue incomming)) (Just fullSignalSet) - (_,ex) <- join . fmap waitAnyCatch $ mapM async - [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage - , addOpenConnections newchans currentClients - , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) - , purgeClosedConnections timeToQuit currentClients - ] - case (ex::Either SomeException ()) of - Left e -> syslog Notice ("Exception: " <> show e) - Right _ -> doNothing - - atomically $ closeTBMQueue newchans - atomically $ closeTBMQueue incomming - -addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do - cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) - whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do - x <- atomically $ readTBMQueue newchans - case x of - Just (newClientThread,clientQ) -> do -- Is clientQ input or output? - syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") - atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) - Nothing -> doNothing - threadDelay qdelay - -purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do - map <- atomically $ readTVar currentClients - let k = M.keys map - e = M.elems map - closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e - let f False a b = [(a,b)] -- still open - f True _ _ = [] -- closed - closing = filter fst (zip closedClients k) - if (not . null $ closing) - then syslog Notice ("Closing connections: " ++ show closing) - else doNothing - atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e - threadDelay qdelay - -handleMessage hdl outq = do - line <- B.hGetLine hdl - tid <- myThreadId - case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of - Right msg -> do - syslog Notice ("Message decoded on thread=" <> show tid) - syslog Notice ("Message: " <> show msg) - Left str -> do - syslog Notice ("ERROR: " <> show line) - syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) - syslog Notice ("ERROR: " ++ str) - -consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) --- TODO: Do more here... -- cgit v1.2.3