From d420aefe0b14081b00d4655f9e8317903b5b02c3 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sun, 21 Jun 2015 07:36:46 -0400 Subject: kikid: The Beginnings of the Kiki Daemon --- KikiD/GetLine.hs | 18 +++++++ KikiD/Message.hs | 18 +++++++ KikiD/Multiplex.hs | 100 ++++++++++++++++++++++++++++++++++++ KikiD/PortServer.hs | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 281 insertions(+) create mode 100644 KikiD/GetLine.hs create mode 100644 KikiD/Message.hs create mode 100644 KikiD/Multiplex.hs create mode 100644 KikiD/PortServer.hs (limited to 'KikiD') diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs new file mode 100644 index 0000000..8af5dc6 --- /dev/null +++ b/KikiD/GetLine.hs @@ -0,0 +1,18 @@ +module KikiD.GetLine where + +import Control.Monad +import Data.Serialize +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as L +import Data.Monoid +import Data.Binary.Builder + +getLine :: Get BS.ByteString +getLine = getWords empty + where + getWords b = do + w <- getWord8 + let x = singleton w + if (w == 10 || w == 0) + then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x + else getWords (b <> x) diff --git a/KikiD/Message.hs b/KikiD/Message.hs new file mode 100644 index 0000000..c4903cc --- /dev/null +++ b/KikiD/Message.hs @@ -0,0 +1,18 @@ +module KikiD.Message where + +import Data.Serialize +import qualified KikiD.GetLine +import qualified Data.ByteString.Char8 as B +import Data.Monoid +import Text.Read + +data KikiDMessage = TODO deriving (Show,Read) + +instance Serialize KikiDMessage where + put = putByteString . B.pack . show + get = do + x <- KikiD.GetLine.getLine + case (readEither (B.unpack x) :: Either String KikiDMessage) of + Right m -> return m + Left er -> fail ("PARSE ERROR: " <> er) + diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs new file mode 100644 index 0000000..4a31127 --- /dev/null +++ b/KikiD/Multiplex.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE BangPatterns #-} +module KikiD.Multiplex where + +import System.IO +import qualified Data.ByteString.Char8 as B +import Data.Monoid +import Control.Concurrent.STM +import Data.Map.Strict as M +import Control.Monad +import Control.Concurrent +import qualified Data.Binary as Bin +import Control.Concurrent.STM.TBMQueue +import Control.Monad.Loops +import Data.List +import Data.Maybe + +-- | pipeTransHookMicroseconds +-- +-- This function indefinitely reads the @fromChan@ queue and applies +-- the function @translate@ to the contents before passing it on to the +-- @toChan@ queue. The @triggerAction@ is performed on the message prior +-- to the translation. The @fromChan@ queue is checked every @micros@ +-- microseconds from the last emptying. +-- +-- To terminate the thread, close @fromChan@ queue. +-- +pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () +pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = + whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do + whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do + msg <- atomically $ readTBMQueue fromChan + case msg of + Just m' -> do + x <- triggerAction m' + case translate x m' of + Just m -> atomically $ writeTBMQueue toChan m + _ -> return () + _ -> return () + threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads + +pipeTransHook fromChan toChan translate triggerAction = + pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction + +pipeTrans fromChan toChan translate = + pipeTransHook fromChan toChan translate (void . return) + +pipeHook fromChan toChan triggerAction = + pipeTransHook fromChan toChan id triggerAction + +pipeQueue fromChan toChan = + pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) + +teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = + whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do + whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do + msg <- atomically $ readTBMQueue fromChan + case msg of + Just m -> do + atomically $ writeTBMQueue toChan1 m + atomically $ writeTBMQueue toChan2 m + _ -> return () + threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads + +teePipeQueue fromChan toChan1 toChan2 = + teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 + + +-- Deprecated: Use consumeQueueMicroseconds +-- TODO: Remove this +withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do + whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do + t <- atomically $ readTBMQueue fromChan + case t of + Just x -> action x + Nothing -> return () + threadDelay delay + +{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} +withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action +{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} + +-- | consumeQueueMicroseconds +-- (as of version 1.0.4) +-- +-- Continously run the provided action on items +-- from the provided queue. Delay for provided +-- microseconds each time the queue is emptied. +consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do + whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do + x <- atomically $ readTBMQueue q + case x of + Just s -> action s + Nothing -> return () + threadDelay micros diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs new file mode 100644 index 0000000..31101a7 --- /dev/null +++ b/KikiD/PortServer.hs @@ -0,0 +1,145 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE BangPatterns #-} +module KikiD.PortServer (createTCPPortListener) where + +import qualified Data.ByteString.Char8 as B +import Network.Socket hiding (send) +import Network.Socket.ByteString +import Data.Monoid ((<>)) +--import qualified Network.IRC as IRC +import Network.HTTP.Base (catchIO,catchIO_) +import Control.Concurrent.STM +import Control.Concurrent +import Control.Monad +import Control.Monad.Fix +import System.IO +import System.Directory (getAppUserDataDirectory) +import Text.Printf (printf) +import System.FilePath (()) +import Control.Concurrent.STM.TBMQueue +import Control.Monad.Loops +import KikiD.Multiplex (pipeTransHookMicroseconds) +import Control.Exception +import Control.Concurrent.Async +import Data.Serialize + +import Control.Arrow (second) +--import qualified Merv.GetLine as MG + +{-instance Serialize IRC.Message where + put = putByteString . IRC.encode + get = do + x <- MG.getLine + case IRC.decode x of + Just x -> return x + Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'") + + +createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int + -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO () +createIRCPortListener port name delay qsize maxconns postNewTChans outq = + createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact + +-} + +createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int + -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a + -> (Handle -> TBMQueue a -> IO ()) -> IO () +createTCPPortListener port name delay qsize maxconns postNewTChans outq react = + bracket + -- aquire resources + (socket AF_INET Stream 0) + + -- release resources + sClose + + -- operate on resources + (\sock -> do + -- make socket immediately reusable - eases debugging. + setSocketOption sock ReuseAddr 1 + -- listen on TCP port 4242 + bindSocket sock (SockAddrInet port iNADDR_ANY) + -- allow a maximum of 15 outstanding connections + listen sock maxconns + sockAcceptLoop sock name delay qsize postNewTChans outq react + ) + +sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a + -> (Handle -> TBMQueue a -> IO ()) -> IO () +sockAcceptLoop listenSock name delay qsize postNewTChans outq react = + whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do + -- accept one connection and handle it + conn@(sock,_) <- accept listenSock + async $ bracket (do + -- acquire resources + hdl <- socketToHandle sock ReadWriteMode + q <- atomically $ newTBMQueue qsize + thisChildOut <- atomically $ newTBMQueue qsize + async1 <- async (runConn hdl name q thisChildOut delay react) + async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 + (\() -> Just) -- no translation on outgoing + (\m -> return ())) + return (hdl,q,thisChildOut,(async1,async2)) + ) + -- release resources + (\(hdl,q,thisChildOut,(async1,async2)) -> do + cancel async1 + cancel async2 + atomically $ closeTBMQueue q + atomically $ closeTBMQueue thisChildOut + hClose hdl + ) + -- run opration on async + (\(_,q,_,(async1,async2)) -> do + let tid = asyncThreadId async1 + atomically $ writeTBMQueue postNewTChans (tid,q) + --link2 async1 async2 -- Do I need this? + waitBoth async1 async2 + ) + +runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int + -> (Handle -> TBMQueue a -> IO ()) -> IO () +runConn hdl name q outq delay react = do + --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) + -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) + -- OnConnect Message... + + race_ + -- continuously read q and output to handle (socket) + -- to terminate thread, close q + (do + let pending = fmap not (atomically $ isEmptyTBMQueue q) + closed = atomically . isClosedTBMQueue $ q + whileM_ (fmap not closed) $ do + whileM_ pending $ do + m <- atomically (readTBMQueue q) + case m of + Just m -> B.hPutStrLn hdl (encode m) + -- Nothing means the Queue is closed and empty, so dont loop + Nothing -> return () + threadDelay delay + --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) + ) + + -- continuously input from handle and + -- send to provided outq + (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) + + +{- +ircReact hdl outq = do + line <- B.hGetLine hdl + -- debugging + dir <- getAppUserDataDirectory "merv" + tid <- myThreadId + let bQuit = (B.isPrefixOf "/quit") line + appendFile (dir "xdebug") + (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line)) + -- end debugging + case IRC.decode line of + Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq + Just m -> atomically $ writeTBMQueue outq m + Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq + _ -> return undefined +-} -- cgit v1.2.3