From 2966db997f43c063389285ddc40579acad5c6a29 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Mon, 22 Jun 2015 18:58:47 -0400 Subject: kikid: Serialization... --- Codec/LineReady.hs | 23 +++++++++++++++++++++++ Codec/SafeBlob.hs | 22 ---------------------- KikiD/GetLine.hs | 18 ------------------ KikiD/Message.hs | 35 +++++++++++++++++++++++++---------- KikiD/PortServer.hs | 43 ++++++------------------------------------- kiki.cabal | 3 ++- kikid.hs | 15 ++++++++++----- 7 files changed, 66 insertions(+), 93 deletions(-) create mode 100644 Codec/LineReady.hs delete mode 100644 Codec/SafeBlob.hs delete mode 100644 KikiD/GetLine.hs diff --git a/Codec/LineReady.hs b/Codec/LineReady.hs new file mode 100644 index 0000000..ca2cde3 --- /dev/null +++ b/Codec/LineReady.hs @@ -0,0 +1,23 @@ +module Codec.LineReady where + +import qualified Data.ByteString.Char8 as B +import Data.Monoid +import Data.List (foldl') +import Data.Maybe + +toLineReady :: B.ByteString -> B.ByteString +toLineReady blob = + let as = zip [0..] (B.unpack blob) + bs = filter ((=='\n') . snd) as + is = map fst bs + in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is <> B.singleton '\n' + +replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString +replaceCharStrIndex c str i = a <> B.singleton c <> B.drop 1 b + where (a,b) = B.splitAt i str + +fromLineReady :: B.ByteString -> B.ByteString +fromLineReady str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is + where is = map fst . mapMaybe B.readInt $ + B.groupBy (\c d -> (c/=',')&&(d/=',')) ls + (ls,str') = B.break (==']') (B.tail str) diff --git a/Codec/SafeBlob.hs b/Codec/SafeBlob.hs deleted file mode 100644 index a6db80e..0000000 --- a/Codec/SafeBlob.hs +++ /dev/null @@ -1,22 +0,0 @@ -{-# LANGUAGE ViewPatterns #-} -module Codec.SafeBlob where - -import qualified Data.ByteString.Char8 as B -import Data.Monoid -import Data.List (foldl') -import Data.Maybe - -toSafe :: B.ByteString -> B.ByteString -toSafe blob = let as = zip [0..] (B.unpack blob) - bs = filter ((=='\n') . snd) as - is = map fst bs - in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is - -replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString -replaceCharStrIndex c str i = a <> B.singleton c <> b - where (a,B.uncons -> Just (_,b)) = B.splitAt i str - -fromSafe str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is - where is = map fst . mapMaybe B.readInt $ - B.groupBy (\c d -> (c/=',')&&(d/=',')) ls - (ls,str') = B.break (==']') (B.tail str) diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs deleted file mode 100644 index 8af5dc6..0000000 --- a/KikiD/GetLine.hs +++ /dev/null @@ -1,18 +0,0 @@ -module KikiD.GetLine where - -import Control.Monad -import Data.Serialize -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as L -import Data.Monoid -import Data.Binary.Builder - -getLine :: Get BS.ByteString -getLine = getWords empty - where - getWords b = do - w <- getWord8 - let x = singleton w - if (w == 10 || w == 0) - then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x - else getWords (b <> x) diff --git a/KikiD/Message.hs b/KikiD/Message.hs index 5b642f3..cd3ee71 100644 --- a/KikiD/Message.hs +++ b/KikiD/Message.hs @@ -1,22 +1,37 @@ +{-# LANGUAGE DoAndIfThenElse #-} module KikiD.Message where -import Data.Serialize -import qualified KikiD.GetLine +import Data.Serialize as Cereal import qualified Data.ByteString.Char8 as B import Data.Monoid import Text.Read -import Data.Char (ord) +import Data.Char (ord,chr) import Control.Monad +import Data.Bytes.Serial as R +import Data.Bytes.Put as Put +import Data.Bytes.Get as Get +import Codec.LineReady +import Control.Monad.Loops data KikiDMessage = TODO deriving (Show,Read) instance Serialize KikiDMessage where - put m = mapM_ (putWord8 . fromIntegral . ord) "TODO" + put m = mapM_ (Cereal.putWord8 . fromIntegral . ord) "TO\nO" -- putByteString . B.pack $ show m ++ "\n" get = do - t <- getWord8 - o <- getWord8 - d <- getWord8 - o <- getWord8 - return TODO - + t <- Cereal.getWord8 + o <- Cereal.getWord8 + d <- Cereal.getWord8 + o <- Cereal.getWord8 + let s = map (chr . fromIntegral) [t,o,d,o] + if "TO\nO" == s + then return TODO + else fail ("Could not decode message: " ++ show s) + +instance Serial KikiDMessage where + serialize m = Put.putByteString . toLineReady . Cereal.encode $ m + deserialize = do + xs <- unfoldWhileM (/= '\n') (fmap (chr . fromIntegral) Get.getWord8) + case (Cereal.decode . fromLineReady $ B.pack xs) of + Left str -> fail str + Right x -> return x diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs index 31101a7..b42e340 100644 --- a/KikiD/PortServer.hs +++ b/KikiD/PortServer.hs @@ -22,28 +22,13 @@ import Control.Monad.Loops import KikiD.Multiplex (pipeTransHookMicroseconds) import Control.Exception import Control.Concurrent.Async -import Data.Serialize +import Data.Bytes.Serial as R +import Data.Bytes.Put as Put import Control.Arrow (second) ---import qualified Merv.GetLine as MG -{-instance Serialize IRC.Message where - put = putByteString . IRC.encode - get = do - x <- MG.getLine - case IRC.decode x of - Just x -> return x - Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'") - -createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int - -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO () -createIRCPortListener port name delay qsize maxconns postNewTChans outq = - createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact - --} - -createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int +createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a -> (Handle -> TBMQueue a -> IO ()) -> IO () createTCPPortListener port name delay qsize maxconns postNewTChans outq react = @@ -65,7 +50,7 @@ createTCPPortListener port name delay qsize maxconns postNewTChans outq react = sockAcceptLoop sock name delay qsize postNewTChans outq react ) -sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a +sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a -> (Handle -> TBMQueue a -> IO ()) -> IO () sockAcceptLoop listenSock name delay qsize postNewTChans outq react = whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do @@ -98,7 +83,7 @@ sockAcceptLoop listenSock name delay qsize postNewTChans outq react = waitBoth async1 async2 ) -runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int +runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int -> (Handle -> TBMQueue a -> IO ()) -> IO () runConn hdl name q outq delay react = do --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) @@ -115,7 +100,7 @@ runConn hdl name q outq delay react = do whileM_ pending $ do m <- atomically (readTBMQueue q) case m of - Just m -> B.hPutStrLn hdl (encode m) + Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m) -- Nothing means the Queue is closed and empty, so dont loop Nothing -> return () threadDelay delay @@ -127,19 +112,3 @@ runConn hdl name q outq delay react = do (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) -{- -ircReact hdl outq = do - line <- B.hGetLine hdl - -- debugging - dir <- getAppUserDataDirectory "merv" - tid <- myThreadId - let bQuit = (B.isPrefixOf "/quit") line - appendFile (dir "xdebug") - (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line)) - -- end debugging - case IRC.decode line of - Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq - Just m -> atomically $ writeTBMQueue outq m - Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq - _ -> return undefined --} diff --git a/kiki.cabal b/kiki.cabal index 2632e6d..450a3ab 100644 --- a/kiki.cabal +++ b/kiki.cabal @@ -45,7 +45,8 @@ Executable kikid monad-loops -any, HTTP -any, stm >= 2.3, - cereal -any + cereal -any, + bytes -any library exposed-modules: KeyRing diff --git a/kikid.hs b/kikid.hs index 31426a3..04e06d3 100644 --- a/kikid.hs +++ b/kikid.hs @@ -16,8 +16,11 @@ import Control.Monad.Loops import Control.Exception import Data.Monoid import qualified Data.ByteString.Char8 as B -import Data.Serialize +--import Data.Serialize import qualified Data.Map as M +import qualified Data.Bytes.Serial as Bytes +import qualified Data.Bytes.Get as Bytes +--import qualified Data.Bytes.Put as Bytes -- TODO: Set this in config file port = 9800 @@ -64,7 +67,7 @@ kikidMain _ = do atomically $ closeTBMQueue newchans atomically $ closeTBMQueue incomming -data ClientState = CState {cs_queue :: TBMQueue KikiDMessage} +data ClientState = CState {csQueue :: TBMQueue KikiDMessage} type ClientID = ThreadId threadIdToClient = id @@ -96,12 +99,14 @@ purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do handleMessage hdl outq = do line <- B.hGetLine hdl tid <- myThreadId - case (decode line :: Either String KikiDMessage) of - Right _ -> + case (Bytes.runGetS Bytes.deserialize (B.snoc line '\n') :: Either String KikiDMessage) of + Right msg -> do syslog Notice ("Message decoded on thread=" <> show tid) + syslog Notice ("Message: " <> show msg) Left str -> do + syslog Notice ("ERROR: " <> show line) syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) - syslog Notice str + syslog Notice ("ERROR: " ++ str) consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) -- TODO: Do more here... -- cgit v1.2.3