summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Codec/LineReady.hs23
-rw-r--r--KikiD/ClientState.hs14
-rw-r--r--KikiD/Message.hs42
-rw-r--r--KikiD/Multiplex.hs100
-rw-r--r--KikiD/PortServer.hs114
-rw-r--r--kikid.hs112
6 files changed, 0 insertions, 405 deletions
diff --git a/Codec/LineReady.hs b/Codec/LineReady.hs
deleted file mode 100644
index a6961ca..0000000
--- a/Codec/LineReady.hs
+++ /dev/null
@@ -1,23 +0,0 @@
1module Codec.LineReady where
2
3import qualified Data.ByteString.Char8 as B
4import Data.Monoid
5import Data.List (foldl')
6import Data.Maybe
7
8toLineReady :: B.ByteString -> B.ByteString
9toLineReady blob =
10 let as = zip [0..] (B.unpack blob)
11 bs = filter ((=='\n') . snd) as
12 is = map fst bs
13 in B.pack (show is) <> foldl' (replaceCharStrIndex '#') blob is
14
15replaceCharStrIndex :: Char -> B.ByteString -> Int -> B.ByteString
16replaceCharStrIndex c str i = a <> B.singleton c <> B.drop 1 b
17 where (a,b) = B.splitAt i str
18
19fromLineReady :: B.ByteString -> B.ByteString
20fromLineReady str = foldl' (replaceCharStrIndex '\n') (B.drop 1 str') is
21 where is = map fst . mapMaybe B.readInt $
22 B.groupBy (\c d -> (c/=',')&&(d/=',')) ls
23 (ls,str') = B.break (==']') (B.drop 1 str)
diff --git a/KikiD/ClientState.hs b/KikiD/ClientState.hs
deleted file mode 100644
index a80a392..0000000
--- a/KikiD/ClientState.hs
+++ /dev/null
@@ -1,14 +0,0 @@
1module KikiD.ClientState where
2
3import KikiD.Message
4import Control.Concurrent.STM.TBMQueue
5import Control.Concurrent
6
7data ClientState = CState {cliQueue :: TBMQueue KikiDMessage}
8
9mkClient = CState
10 { cliQueue = error "ERROR CState: cliQueue parameter is required"
11 }
12
13type ClientID = ThreadId
14threadIdToClient = id
diff --git a/KikiD/Message.hs b/KikiD/Message.hs
deleted file mode 100644
index efefdc6..0000000
--- a/KikiD/Message.hs
+++ /dev/null
@@ -1,42 +0,0 @@
1{-# LANGUAGE DoAndIfThenElse #-}
2module KikiD.Message where
3
4import Data.Serialize as Cereal
5import qualified Data.ByteString.Char8 as B
6import Data.Monoid
7import Text.Read
8import Data.Char (ord,chr)
9import Control.Monad
10import Data.Bytes.Serial as R
11import Data.Bytes.Put as Put
12import Data.Bytes.Get as Get
13import Codec.LineReady
14import Control.Monad.Loops
15import Data.Word
16
17data KikiDMessage = TODO deriving (Show,Read)
18
19instance Serialize KikiDMessage where
20 put m = mapM_ (Cereal.putWord8 . fromIntegral . ord) "TO\nO"
21 -- putByteString . B.pack $ show m ++ "\n"
22 get = do
23 t <- Cereal.getWord8
24 o <- Cereal.getWord8
25 d <- Cereal.getWord8
26 o <- Cereal.getWord8
27 let s = map (chr . fromIntegral) [t,o,d,o]
28 if "TO\nO" == s
29 then return TODO
30 else fail ("Could not decode message: " ++ show s)
31
32instance Serial KikiDMessage where
33 serialize m = Put.putByteString . toLineReady . Cereal.encode $ m
34 deserialize = do
35 xs <- unfoldM $ do
36 flag <- Get.isEmpty
37 if flag then return Nothing else do
38 c <- fmap (chr . fromIntegral) Get.getWord8
39 if (c == '\n') then return Nothing else return (Just c)
40 case (Cereal.decode . fromLineReady $ B.pack xs) of
41 Left str -> fail str
42 Right x -> return x
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs
deleted file mode 100644
index 4a31127..0000000
--- a/KikiD/Multiplex.hs
+++ /dev/null
@@ -1,100 +0,0 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE DeriveGeneric #-}
7{-# LANGUAGE BangPatterns #-}
8module KikiD.Multiplex where
9
10import System.IO
11import qualified Data.ByteString.Char8 as B
12import Data.Monoid
13import Control.Concurrent.STM
14import Data.Map.Strict as M
15import Control.Monad
16import Control.Concurrent
17import qualified Data.Binary as Bin
18import Control.Concurrent.STM.TBMQueue
19import Control.Monad.Loops
20import Data.List
21import Data.Maybe
22
23-- | pipeTransHookMicroseconds
24--
25-- This function indefinitely reads the @fromChan@ queue and applies
26-- the function @translate@ to the contents before passing it on to the
27-- @toChan@ queue. The @triggerAction@ is performed on the message prior
28-- to the translation. The @fromChan@ queue is checked every @micros@
29-- microseconds from the last emptying.
30--
31-- To terminate the thread, close @fromChan@ queue.
32--
33pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO ()
34pipeTransHookMicroseconds fromChan toChan micros translate triggerAction =
35 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
36 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
37 msg <- atomically $ readTBMQueue fromChan
38 case msg of
39 Just m' -> do
40 x <- triggerAction m'
41 case translate x m' of
42 Just m -> atomically $ writeTBMQueue toChan m
43 _ -> return ()
44 _ -> return ()
45 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
46
47pipeTransHook fromChan toChan translate triggerAction =
48 pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction
49
50pipeTrans fromChan toChan translate =
51 pipeTransHook fromChan toChan translate (void . return)
52
53pipeHook fromChan toChan triggerAction =
54 pipeTransHook fromChan toChan id triggerAction
55
56pipeQueue fromChan toChan =
57 pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return)
58
59teePipeQueueMicroseconds fromChan toChan1 toChan2 micros =
60 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
61 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
62 msg <- atomically $ readTBMQueue fromChan
63 case msg of
64 Just m -> do
65 atomically $ writeTBMQueue toChan1 m
66 atomically $ writeTBMQueue toChan2 m
67 _ -> return ()
68 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
69
70teePipeQueue fromChan toChan1 toChan2 =
71 teePipeQueueMicroseconds fromChan toChan1 toChan2 5000
72
73
74-- Deprecated: Use consumeQueueMicroseconds
75-- TODO: Remove this
76withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do
77 whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do
78 t <- atomically $ readTBMQueue fromChan
79 case t of
80 Just x -> action x
81 Nothing -> return ()
82 threadDelay delay
83
84{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-}
85withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action
86{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-}
87
88-- | consumeQueueMicroseconds
89-- (as of version 1.0.4)
90--
91-- Continously run the provided action on items
92-- from the provided queue. Delay for provided
93-- microseconds each time the queue is emptied.
94consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do
95 whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do
96 x <- atomically $ readTBMQueue q
97 case x of
98 Just s -> action s
99 Nothing -> return ()
100 threadDelay micros
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs
deleted file mode 100644
index b42e340..0000000
--- a/KikiD/PortServer.hs
+++ /dev/null
@@ -1,114 +0,0 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE BangPatterns #-}
4module KikiD.PortServer (createTCPPortListener) where
5
6import qualified Data.ByteString.Char8 as B
7import Network.Socket hiding (send)
8import Network.Socket.ByteString
9import Data.Monoid ((<>))
10--import qualified Network.IRC as IRC
11import Network.HTTP.Base (catchIO,catchIO_)
12import Control.Concurrent.STM
13import Control.Concurrent
14import Control.Monad
15import Control.Monad.Fix
16import System.IO
17import System.Directory (getAppUserDataDirectory)
18import Text.Printf (printf)
19import System.FilePath ((</>))
20import Control.Concurrent.STM.TBMQueue
21import Control.Monad.Loops
22import KikiD.Multiplex (pipeTransHookMicroseconds)
23import Control.Exception
24import Control.Concurrent.Async
25import Data.Bytes.Serial as R
26import Data.Bytes.Put as Put
27
28import Control.Arrow (second)
29
30
31createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int
32 -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
33 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
34createTCPPortListener port name delay qsize maxconns postNewTChans outq react =
35 bracket
36 -- aquire resources
37 (socket AF_INET Stream 0)
38
39 -- release resources
40 sClose
41
42 -- operate on resources
43 (\sock -> do
44 -- make socket immediately reusable - eases debugging.
45 setSocketOption sock ReuseAddr 1
46 -- listen on TCP port 4242
47 bindSocket sock (SockAddrInet port iNADDR_ANY)
48 -- allow a maximum of 15 outstanding connections
49 listen sock maxconns
50 sockAcceptLoop sock name delay qsize postNewTChans outq react
51 )
52
53sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
54 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
55sockAcceptLoop listenSock name delay qsize postNewTChans outq react =
56 whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
57 -- accept one connection and handle it
58 conn@(sock,_) <- accept listenSock
59 async $ bracket (do
60 -- acquire resources
61 hdl <- socketToHandle sock ReadWriteMode
62 q <- atomically $ newTBMQueue qsize
63 thisChildOut <- atomically $ newTBMQueue qsize
64 async1 <- async (runConn hdl name q thisChildOut delay react)
65 async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
66 (\() -> Just) -- no translation on outgoing
67 (\m -> return ()))
68 return (hdl,q,thisChildOut,(async1,async2))
69 )
70 -- release resources
71 (\(hdl,q,thisChildOut,(async1,async2)) -> do
72 cancel async1
73 cancel async2
74 atomically $ closeTBMQueue q
75 atomically $ closeTBMQueue thisChildOut
76 hClose hdl
77 )
78 -- run opration on async
79 (\(_,q,_,(async1,async2)) -> do
80 let tid = asyncThreadId async1
81 atomically $ writeTBMQueue postNewTChans (tid,q)
82 --link2 async1 async2 -- Do I need this?
83 waitBoth async1 async2
84 )
85
86runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int
87 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
88runConn hdl name q outq delay react = do
89 --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
90 -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
91 -- OnConnect Message...
92
93 race_
94 -- continuously read q and output to handle (socket)
95 -- to terminate thread, close q
96 (do
97 let pending = fmap not (atomically $ isEmptyTBMQueue q)
98 closed = atomically . isClosedTBMQueue $ q
99 whileM_ (fmap not closed) $ do
100 whileM_ pending $ do
101 m <- atomically (readTBMQueue q)
102 case m of
103 Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m)
104 -- Nothing means the Queue is closed and empty, so dont loop
105 Nothing -> return ()
106 threadDelay delay
107 --B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
108 )
109
110 -- continuously input from handle and
111 -- send to provided outq
112 (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )
113
114
diff --git a/kikid.hs b/kikid.hs
deleted file mode 100644
index 059a5df..0000000
--- a/kikid.hs
+++ /dev/null
@@ -1,112 +0,0 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE DoAndIfThenElse #-}
3
4import System.Posix.Daemonize
5import Control.Concurrent
6import System.Posix.Syslog
7import System.Posix.Signals
8import System.Posix.User (getEffectiveUserID)
9
10import Control.Concurrent.STM
11import Control.Concurrent.STM.TBMQueue
12import Control.Concurrent.Async
13import Control.Monad
14import Control.Monad.Loops
15import Control.Exception
16import Data.Monoid
17import qualified Data.ByteString.Char8 as B
18--import Data.Serialize
19import qualified Data.Map as M
20import qualified Data.Bytes.Serial as Bytes
21import qualified Data.Bytes.Get as Bytes
22import qualified Data.Bytes.Put as Bytes
23
24import KikiD.PortServer
25import KikiD.Multiplex
26import KikiD.Message
27import KikiD.ClientState
28
29-- TODO: Set this in config file
30port = 9800
31max_conns = 100
32qsize = 20
33qdelay = 5*10^5
34
35doNothing = return ()
36
37main = do
38 me <- getEffectiveUserID
39 if me == 0 then
40 serviced simpleDaemon { privilegedAction = startupAsRoot
41 , program = kikidMain
42 , user = Just "root", group = Just "root" }
43 else putStrLn "Kiki Daemon must be run as root."
44
45startupAsRoot = syslog Notice "kikid Started."
46
47kikidMain _ = do
48 refreshFlag <- atomically $ newTVar True
49 whileM_ (atomically $ readTVar refreshFlag) $ do
50 incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
51 newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
52 currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
53 let timeToQuit = atomically . isClosedTBMQueue $ newchans
54 installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
55 atomically $ writeTVar refreshFlag False
56 atomically $ closeTBMQueue newchans
57 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
58 installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
59 atomically $ closeTBMQueue newchans
60 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
61 (_,ex) <- join . fmap waitAnyCatch $ mapM async
62 [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
63 , addOpenConnections newchans currentClients
64 , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
65 , purgeClosedConnections timeToQuit currentClients
66 ]
67 case (ex::Either SomeException ()) of
68 Left e -> syslog Notice ("Exception: " <> show e)
69 Right _ -> doNothing
70
71 atomically $ closeTBMQueue newchans
72 atomically $ closeTBMQueue incomming
73
74addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
75 cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
76 whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
77 x <- atomically $ readTBMQueue newchans
78 case x of
79 Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
80 syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
81 atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
82 Nothing -> doNothing
83 threadDelay qdelay
84
85purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
86 map <- atomically $ readTVar currentClients
87 let k = M.keys map
88 e = M.elems map
89 closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
90 let f False a b = [(a,b)] -- still open
91 f True _ _ = [] -- closed
92 closing = filter fst (zip closedClients k)
93 if (not . null $ closing)
94 then syslog Notice ("Closing connections: " ++ show closing)
95 else doNothing
96 atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
97 threadDelay qdelay
98
99handleMessage hdl outq = do
100 line <- B.hGetLine hdl
101 tid <- myThreadId
102 case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of
103 Right msg -> do
104 syslog Notice ("Message decoded on thread=" <> show tid)
105 syslog Notice ("Message: " <> show msg)
106 Left str -> do
107 syslog Notice ("ERROR: " <> show line)
108 syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
109 syslog Notice ("ERROR: " ++ str)
110
111consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
112-- TODO: Do more here...