diff options
-rw-r--r-- | Codec/LineReady.hs | 4 | ||||
-rw-r--r-- | KikiD/ClientState.hs | 14 | ||||
-rw-r--r-- | KikiD/Message.hs | 7 | ||||
-rw-r--r-- | kikid.hs | 18 |
4 files changed, 31 insertions, 12 deletions
diff --git a/Codec/LineReady.hs b/Codec/LineReady.hs index ca2cde3..a6961ca 100644 --- a/Codec/LineReady.hs +++ b/Codec/LineReady.hs | |||
@@ -10,7 +10,7 @@ toLineReady blob = | |||
10 | let as = zip [0..] (B.unpack blob) | 10 | let as = zip [0..] (B.unpack blob) |
11 | bs = filter ((=='\n') . snd) as | 11 | bs = filter ((=='\n') . snd) as |
12 | is = map fst bs | 12 | is = map fst bs |
13 | in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is <> B.singleton '\n' | 13 | in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is |
14 | 14 | ||
15 | replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString | 15 | replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString |
16 | replaceCharStrIndex c str i = a <> B.singleton c <> B.drop 1 b | 16 | replaceCharStrIndex c str i = a <> B.singleton c <> B.drop 1 b |
@@ -20,4 +20,4 @@ fromLineReady :: B.ByteString -> B.ByteString | |||
20 | fromLineReady str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is | 20 | fromLineReady str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is |
21 | where is = map fst . mapMaybe B.readInt $ | 21 | where is = map fst . mapMaybe B.readInt $ |
22 | B.groupBy (\c d -> (c/=',')&&(d/=',')) ls | 22 | B.groupBy (\c d -> (c/=',')&&(d/=',')) ls |
23 | (ls,str') = B.break (==']') (B.tail str) | 23 | (ls,str') = B.break (==']') (B.drop 1 str) |
diff --git a/KikiD/ClientState.hs b/KikiD/ClientState.hs new file mode 100644 index 0000000..a80a392 --- /dev/null +++ b/KikiD/ClientState.hs | |||
@@ -0,0 +1,14 @@ | |||
1 | module KikiD.ClientState where | ||
2 | |||
3 | import KikiD.Message | ||
4 | import Control.Concurrent.STM.TBMQueue | ||
5 | import Control.Concurrent | ||
6 | |||
7 | data ClientState = CState {cliQueue :: TBMQueue KikiDMessage} | ||
8 | |||
9 | mkClient = CState | ||
10 | { cliQueue = error "ERROR CState: cliQueue parameter is required" | ||
11 | } | ||
12 | |||
13 | type ClientID = ThreadId | ||
14 | threadIdToClient = id | ||
diff --git a/KikiD/Message.hs b/KikiD/Message.hs index cd3ee71..efefdc6 100644 --- a/KikiD/Message.hs +++ b/KikiD/Message.hs | |||
@@ -12,6 +12,7 @@ import Data.Bytes.Put as Put | |||
12 | import Data.Bytes.Get as Get | 12 | import Data.Bytes.Get as Get |
13 | import Codec.LineReady | 13 | import Codec.LineReady |
14 | import Control.Monad.Loops | 14 | import Control.Monad.Loops |
15 | import Data.Word | ||
15 | 16 | ||
16 | data KikiDMessage = TODO deriving (Show,Read) | 17 | data KikiDMessage = TODO deriving (Show,Read) |
17 | 18 | ||
@@ -31,7 +32,11 @@ instance Serialize KikiDMessage where | |||
31 | instance Serial KikiDMessage where | 32 | instance Serial KikiDMessage where |
32 | serialize m = Put.putByteString . toLineReady . Cereal.encode $ m | 33 | serialize m = Put.putByteString . toLineReady . Cereal.encode $ m |
33 | deserialize = do | 34 | deserialize = do |
34 | xs <- unfoldWhileM (/= '\n') (fmap (chr . fromIntegral) Get.getWord8) | 35 | xs <- unfoldM $ do |
36 | flag <- Get.isEmpty | ||
37 | if flag then return Nothing else do | ||
38 | c <- fmap (chr . fromIntegral) Get.getWord8 | ||
39 | if (c == '\n') then return Nothing else return (Just c) | ||
35 | case (Cereal.decode . fromLineReady $ B.pack xs) of | 40 | case (Cereal.decode . fromLineReady $ B.pack xs) of |
36 | Left str -> fail str | 41 | Left str -> fail str |
37 | Right x -> return x | 42 | Right x -> return x |
@@ -1,13 +1,12 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | 1 | {-# LANGUAGE OverloadedStrings #-} |
2 | {-# LANGUAGE DoAndIfThenElse #-} | ||
2 | 3 | ||
3 | import System.Posix.Daemonize | 4 | import System.Posix.Daemonize |
4 | import Control.Concurrent | 5 | import Control.Concurrent |
5 | import System.Posix.Syslog | 6 | import System.Posix.Syslog |
6 | import System.Posix.Signals | 7 | import System.Posix.Signals |
7 | import System.Posix.User (getEffectiveUserID) | 8 | import System.Posix.User (getEffectiveUserID) |
8 | import KikiD.PortServer | 9 | |
9 | import KikiD.Multiplex | ||
10 | import KikiD.Message | ||
11 | import Control.Concurrent.STM | 10 | import Control.Concurrent.STM |
12 | import Control.Concurrent.STM.TBMQueue | 11 | import Control.Concurrent.STM.TBMQueue |
13 | import Control.Concurrent.Async | 12 | import Control.Concurrent.Async |
@@ -20,7 +19,12 @@ import qualified Data.ByteString.Char8 as B | |||
20 | import qualified Data.Map as M | 19 | import qualified Data.Map as M |
21 | import qualified Data.Bytes.Serial as Bytes | 20 | import qualified Data.Bytes.Serial as Bytes |
22 | import qualified Data.Bytes.Get as Bytes | 21 | import qualified Data.Bytes.Get as Bytes |
23 | --import qualified Data.Bytes.Put as Bytes | 22 | import qualified Data.Bytes.Put as Bytes |
23 | |||
24 | import KikiD.PortServer | ||
25 | import KikiD.Multiplex | ||
26 | import KikiD.Message | ||
27 | import KikiD.ClientState | ||
24 | 28 | ||
25 | -- TODO: Set this in config file | 29 | -- TODO: Set this in config file |
26 | port = 9800 | 30 | port = 9800 |
@@ -67,10 +71,6 @@ kikidMain _ = do | |||
67 | atomically $ closeTBMQueue newchans | 71 | atomically $ closeTBMQueue newchans |
68 | atomically $ closeTBMQueue incomming | 72 | atomically $ closeTBMQueue incomming |
69 | 73 | ||
70 | data ClientState = CState {csQueue :: TBMQueue KikiDMessage} | ||
71 | type ClientID = ThreadId | ||
72 | threadIdToClient = id | ||
73 | |||
74 | addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do | 74 | addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do |
75 | cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) | 75 | cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) |
76 | whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do | 76 | whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do |
@@ -99,7 +99,7 @@ purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do | |||
99 | handleMessage hdl outq = do | 99 | handleMessage hdl outq = do |
100 | line <- B.hGetLine hdl | 100 | line <- B.hGetLine hdl |
101 | tid <- myThreadId | 101 | tid <- myThreadId |
102 | case (Bytes.runGetS Bytes.deserialize (B.snoc line '\n') :: Either String KikiDMessage) of | 102 | case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of |
103 | Right msg -> do | 103 | Right msg -> do |
104 | syslog Notice ("Message decoded on thread=" <> show tid) | 104 | syslog Notice ("Message decoded on thread=" <> show tid) |
105 | syslog Notice ("Message: " <> show msg) | 105 | syslog Notice ("Message: " <> show msg) |