diff options
-rw-r--r-- | KikiD/GetLine.hs | 18 | ||||
-rw-r--r-- | KikiD/Message.hs | 18 | ||||
-rw-r--r-- | KikiD/Multiplex.hs | 100 | ||||
-rw-r--r-- | KikiD/PortServer.hs | 145 | ||||
-rw-r--r-- | kiki.cabal | 18 | ||||
-rw-r--r-- | kikid.hs | 107 |
6 files changed, 405 insertions, 1 deletions
diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs new file mode 100644 index 0000000..8af5dc6 --- /dev/null +++ b/KikiD/GetLine.hs | |||
@@ -0,0 +1,18 @@ | |||
1 | module KikiD.GetLine where | ||
2 | |||
3 | import Control.Monad | ||
4 | import Data.Serialize | ||
5 | import qualified Data.ByteString as BS | ||
6 | import qualified Data.ByteString.Lazy as L | ||
7 | import Data.Monoid | ||
8 | import Data.Binary.Builder | ||
9 | |||
10 | getLine :: Get BS.ByteString | ||
11 | getLine = getWords empty | ||
12 | where | ||
13 | getWords b = do | ||
14 | w <- getWord8 | ||
15 | let x = singleton w | ||
16 | if (w == 10 || w == 0) | ||
17 | then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x | ||
18 | else getWords (b <> x) | ||
diff --git a/KikiD/Message.hs b/KikiD/Message.hs new file mode 100644 index 0000000..c4903cc --- /dev/null +++ b/KikiD/Message.hs | |||
@@ -0,0 +1,18 @@ | |||
1 | module KikiD.Message where | ||
2 | |||
3 | import Data.Serialize | ||
4 | import qualified KikiD.GetLine | ||
5 | import qualified Data.ByteString.Char8 as B | ||
6 | import Data.Monoid | ||
7 | import Text.Read | ||
8 | |||
9 | data KikiDMessage = TODO deriving (Show,Read) | ||
10 | |||
11 | instance Serialize KikiDMessage where | ||
12 | put = putByteString . B.pack . show | ||
13 | get = do | ||
14 | x <- KikiD.GetLine.getLine | ||
15 | case (readEither (B.unpack x) :: Either String KikiDMessage) of | ||
16 | Right m -> return m | ||
17 | Left er -> fail ("PARSE ERROR: " <> er) | ||
18 | |||
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs new file mode 100644 index 0000000..4a31127 --- /dev/null +++ b/KikiD/Multiplex.hs | |||
@@ -0,0 +1,100 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE TupleSections #-} | ||
4 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
6 | {-# LANGUAGE DeriveGeneric #-} | ||
7 | {-# LANGUAGE BangPatterns #-} | ||
8 | module KikiD.Multiplex where | ||
9 | |||
10 | import System.IO | ||
11 | import qualified Data.ByteString.Char8 as B | ||
12 | import Data.Monoid | ||
13 | import Control.Concurrent.STM | ||
14 | import Data.Map.Strict as M | ||
15 | import Control.Monad | ||
16 | import Control.Concurrent | ||
17 | import qualified Data.Binary as Bin | ||
18 | import Control.Concurrent.STM.TBMQueue | ||
19 | import Control.Monad.Loops | ||
20 | import Data.List | ||
21 | import Data.Maybe | ||
22 | |||
23 | -- | pipeTransHookMicroseconds | ||
24 | -- | ||
25 | -- This function indefinitely reads the @fromChan@ queue and applies | ||
26 | -- the function @translate@ to the contents before passing it on to the | ||
27 | -- @toChan@ queue. The @triggerAction@ is performed on the message prior | ||
28 | -- to the translation. The @fromChan@ queue is checked every @micros@ | ||
29 | -- microseconds from the last emptying. | ||
30 | -- | ||
31 | -- To terminate the thread, close @fromChan@ queue. | ||
32 | -- | ||
33 | pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () | ||
34 | pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = | ||
35 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
36 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
37 | msg <- atomically $ readTBMQueue fromChan | ||
38 | case msg of | ||
39 | Just m' -> do | ||
40 | x <- triggerAction m' | ||
41 | case translate x m' of | ||
42 | Just m -> atomically $ writeTBMQueue toChan m | ||
43 | _ -> return () | ||
44 | _ -> return () | ||
45 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
46 | |||
47 | pipeTransHook fromChan toChan translate triggerAction = | ||
48 | pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction | ||
49 | |||
50 | pipeTrans fromChan toChan translate = | ||
51 | pipeTransHook fromChan toChan translate (void . return) | ||
52 | |||
53 | pipeHook fromChan toChan triggerAction = | ||
54 | pipeTransHook fromChan toChan id triggerAction | ||
55 | |||
56 | pipeQueue fromChan toChan = | ||
57 | pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) | ||
58 | |||
59 | teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = | ||
60 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
61 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
62 | msg <- atomically $ readTBMQueue fromChan | ||
63 | case msg of | ||
64 | Just m -> do | ||
65 | atomically $ writeTBMQueue toChan1 m | ||
66 | atomically $ writeTBMQueue toChan2 m | ||
67 | _ -> return () | ||
68 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
69 | |||
70 | teePipeQueue fromChan toChan1 toChan2 = | ||
71 | teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 | ||
72 | |||
73 | |||
74 | -- Deprecated: Use consumeQueueMicroseconds | ||
75 | -- TODO: Remove this | ||
76 | withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do | ||
77 | whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do | ||
78 | t <- atomically $ readTBMQueue fromChan | ||
79 | case t of | ||
80 | Just x -> action x | ||
81 | Nothing -> return () | ||
82 | threadDelay delay | ||
83 | |||
84 | {-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} | ||
85 | withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action | ||
86 | {-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} | ||
87 | |||
88 | -- | consumeQueueMicroseconds | ||
89 | -- (as of version 1.0.4) | ||
90 | -- | ||
91 | -- Continously run the provided action on items | ||
92 | -- from the provided queue. Delay for provided | ||
93 | -- microseconds each time the queue is emptied. | ||
94 | consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do | ||
95 | whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do | ||
96 | x <- atomically $ readTBMQueue q | ||
97 | case x of | ||
98 | Just s -> action s | ||
99 | Nothing -> return () | ||
100 | threadDelay micros | ||
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs new file mode 100644 index 0000000..31101a7 --- /dev/null +++ b/KikiD/PortServer.hs | |||
@@ -0,0 +1,145 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE BangPatterns #-} | ||
4 | module KikiD.PortServer (createTCPPortListener) where | ||
5 | |||
6 | import qualified Data.ByteString.Char8 as B | ||
7 | import Network.Socket hiding (send) | ||
8 | import Network.Socket.ByteString | ||
9 | import Data.Monoid ((<>)) | ||
10 | --import qualified Network.IRC as IRC | ||
11 | import Network.HTTP.Base (catchIO,catchIO_) | ||
12 | import Control.Concurrent.STM | ||
13 | import Control.Concurrent | ||
14 | import Control.Monad | ||
15 | import Control.Monad.Fix | ||
16 | import System.IO | ||
17 | import System.Directory (getAppUserDataDirectory) | ||
18 | import Text.Printf (printf) | ||
19 | import System.FilePath ((</>)) | ||
20 | import Control.Concurrent.STM.TBMQueue | ||
21 | import Control.Monad.Loops | ||
22 | import KikiD.Multiplex (pipeTransHookMicroseconds) | ||
23 | import Control.Exception | ||
24 | import Control.Concurrent.Async | ||
25 | import Data.Serialize | ||
26 | |||
27 | import Control.Arrow (second) | ||
28 | --import qualified Merv.GetLine as MG | ||
29 | |||
30 | {-instance Serialize IRC.Message where | ||
31 | put = putByteString . IRC.encode | ||
32 | get = do | ||
33 | x <- MG.getLine | ||
34 | case IRC.decode x of | ||
35 | Just x -> return x | ||
36 | Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'") | ||
37 | |||
38 | |||
39 | createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int | ||
40 | -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO () | ||
41 | createIRCPortListener port name delay qsize maxconns postNewTChans outq = | ||
42 | createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact | ||
43 | |||
44 | -} | ||
45 | |||
46 | createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int | ||
47 | -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
48 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
49 | createTCPPortListener port name delay qsize maxconns postNewTChans outq react = | ||
50 | bracket | ||
51 | -- aquire resources | ||
52 | (socket AF_INET Stream 0) | ||
53 | |||
54 | -- release resources | ||
55 | sClose | ||
56 | |||
57 | -- operate on resources | ||
58 | (\sock -> do | ||
59 | -- make socket immediately reusable - eases debugging. | ||
60 | setSocketOption sock ReuseAddr 1 | ||
61 | -- listen on TCP port 4242 | ||
62 | bindSocket sock (SockAddrInet port iNADDR_ANY) | ||
63 | -- allow a maximum of 15 outstanding connections | ||
64 | listen sock maxconns | ||
65 | sockAcceptLoop sock name delay qsize postNewTChans outq react | ||
66 | ) | ||
67 | |||
68 | sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
69 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
70 | sockAcceptLoop listenSock name delay qsize postNewTChans outq react = | ||
71 | whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do | ||
72 | -- accept one connection and handle it | ||
73 | conn@(sock,_) <- accept listenSock | ||
74 | async $ bracket (do | ||
75 | -- acquire resources | ||
76 | hdl <- socketToHandle sock ReadWriteMode | ||
77 | q <- atomically $ newTBMQueue qsize | ||
78 | thisChildOut <- atomically $ newTBMQueue qsize | ||
79 | async1 <- async (runConn hdl name q thisChildOut delay react) | ||
80 | async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 | ||
81 | (\() -> Just) -- no translation on outgoing | ||
82 | (\m -> return ())) | ||
83 | return (hdl,q,thisChildOut,(async1,async2)) | ||
84 | ) | ||
85 | -- release resources | ||
86 | (\(hdl,q,thisChildOut,(async1,async2)) -> do | ||
87 | cancel async1 | ||
88 | cancel async2 | ||
89 | atomically $ closeTBMQueue q | ||
90 | atomically $ closeTBMQueue thisChildOut | ||
91 | hClose hdl | ||
92 | ) | ||
93 | -- run opration on async | ||
94 | (\(_,q,_,(async1,async2)) -> do | ||
95 | let tid = asyncThreadId async1 | ||
96 | atomically $ writeTBMQueue postNewTChans (tid,q) | ||
97 | --link2 async1 async2 -- Do I need this? | ||
98 | waitBoth async1 async2 | ||
99 | ) | ||
100 | |||
101 | runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int | ||
102 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
103 | runConn hdl name q outq delay react = do | ||
104 | --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
105 | -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
106 | -- OnConnect Message... | ||
107 | |||
108 | race_ | ||
109 | -- continuously read q and output to handle (socket) | ||
110 | -- to terminate thread, close q | ||
111 | (do | ||
112 | let pending = fmap not (atomically $ isEmptyTBMQueue q) | ||
113 | closed = atomically . isClosedTBMQueue $ q | ||
114 | whileM_ (fmap not closed) $ do | ||
115 | whileM_ pending $ do | ||
116 | m <- atomically (readTBMQueue q) | ||
117 | case m of | ||
118 | Just m -> B.hPutStrLn hdl (encode m) | ||
119 | -- Nothing means the Queue is closed and empty, so dont loop | ||
120 | Nothing -> return () | ||
121 | threadDelay delay | ||
122 | --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) | ||
123 | ) | ||
124 | |||
125 | -- continuously input from handle and | ||
126 | -- send to provided outq | ||
127 | (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) | ||
128 | |||
129 | |||
130 | {- | ||
131 | ircReact hdl outq = do | ||
132 | line <- B.hGetLine hdl | ||
133 | -- debugging | ||
134 | dir <- getAppUserDataDirectory "merv" | ||
135 | tid <- myThreadId | ||
136 | let bQuit = (B.isPrefixOf "/quit") line | ||
137 | appendFile (dir </> "xdebug") | ||
138 | (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line)) | ||
139 | -- end debugging | ||
140 | case IRC.decode line of | ||
141 | Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq | ||
142 | Just m -> atomically $ writeTBMQueue outq m | ||
143 | Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq | ||
144 | _ -> return undefined | ||
145 | -} | ||
@@ -13,7 +13,9 @@ build-type: Simple | |||
13 | 13 | ||
14 | Executable kiki | 14 | Executable kiki |
15 | Main-is: kiki.hs | 15 | Main-is: kiki.hs |
16 | Build-Depends: base -any, directory -any, | 16 | -- base >=4.6 due to use of readEither in KikiD.Message |
17 | Build-Depends: base >=4.6.0.0, | ||
18 | directory -any, | ||
17 | openpgp-util -any, | 19 | openpgp-util -any, |
18 | crypto-pubkey (>=0.2.3), cryptohash -any, | 20 | crypto-pubkey (>=0.2.3), cryptohash -any, |
19 | crypto-pubkey-types -any, | 21 | crypto-pubkey-types -any, |
@@ -31,5 +33,19 @@ Executable hosts | |||
31 | Main-is: hosts.hs | 33 | Main-is: hosts.hs |
32 | c-sources: dotlock.c | 34 | c-sources: dotlock.c |
33 | 35 | ||
36 | Executable kikid | ||
37 | Main-is: kikid.hs | ||
38 | Build-Depends: base -any, | ||
39 | --kiki >=0.0.3, | ||
40 | hdaemonize >= 0.5, | ||
41 | hsyslog -any, | ||
42 | async >= 2.0.0, | ||
43 | stm-chans >= 2.0.0, | ||
44 | network >= 2.4 && < 3.0, | ||
45 | monad-loops -any, | ||
46 | HTTP -any, | ||
47 | stm >= 2.3, | ||
48 | cereal -any | ||
49 | |||
34 | library | 50 | library |
35 | exposed-modules: KeyRing | 51 | exposed-modules: KeyRing |
diff --git a/kikid.hs b/kikid.hs new file mode 100644 index 0000000..31426a3 --- /dev/null +++ b/kikid.hs | |||
@@ -0,0 +1,107 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | |||
3 | import System.Posix.Daemonize | ||
4 | import Control.Concurrent | ||
5 | import System.Posix.Syslog | ||
6 | import System.Posix.Signals | ||
7 | import System.Posix.User (getEffectiveUserID) | ||
8 | import KikiD.PortServer | ||
9 | import KikiD.Multiplex | ||
10 | import KikiD.Message | ||
11 | import Control.Concurrent.STM | ||
12 | import Control.Concurrent.STM.TBMQueue | ||
13 | import Control.Concurrent.Async | ||
14 | import Control.Monad | ||
15 | import Control.Monad.Loops | ||
16 | import Control.Exception | ||
17 | import Data.Monoid | ||
18 | import qualified Data.ByteString.Char8 as B | ||
19 | import Data.Serialize | ||
20 | import qualified Data.Map as M | ||
21 | |||
22 | -- TODO: Set this in config file | ||
23 | port = 9800 | ||
24 | max_conns = 100 | ||
25 | qsize = 20 | ||
26 | qdelay = 5*10^5 | ||
27 | |||
28 | doNothing = return () | ||
29 | |||
30 | main = do | ||
31 | me <- getEffectiveUserID | ||
32 | if me == 0 then | ||
33 | serviced simpleDaemon { privilegedAction = startupAsRoot | ||
34 | , program = kikidMain | ||
35 | , user = Just "root", group = Just "root" } | ||
36 | else putStrLn "Kiki Daemon must be run as root." | ||
37 | |||
38 | startupAsRoot = syslog Notice "kikid Started." | ||
39 | |||
40 | kikidMain _ = do | ||
41 | refreshFlag <- atomically $ newTVar True | ||
42 | whileM_ (atomically $ readTVar refreshFlag) $ do | ||
43 | incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) | ||
44 | newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) | ||
45 | currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) | ||
46 | let timeToQuit = atomically . isClosedTBMQueue $ newchans | ||
47 | installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") | ||
48 | atomically $ writeTVar refreshFlag False | ||
49 | atomically $ closeTBMQueue newchans | ||
50 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
51 | installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") | ||
52 | atomically $ closeTBMQueue newchans | ||
53 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
54 | (_,ex) <- join . fmap waitAnyCatch $ mapM async | ||
55 | [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage | ||
56 | , addOpenConnections newchans currentClients | ||
57 | , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) | ||
58 | , purgeClosedConnections timeToQuit currentClients | ||
59 | ] | ||
60 | case (ex::Either SomeException ()) of | ||
61 | Left e -> syslog Notice ("Exception: " <> show e) | ||
62 | Right _ -> doNothing | ||
63 | |||
64 | atomically $ closeTBMQueue newchans | ||
65 | atomically $ closeTBMQueue incomming | ||
66 | |||
67 | data ClientState = CState {cs_queue :: TBMQueue KikiDMessage} | ||
68 | type ClientID = ThreadId | ||
69 | threadIdToClient = id | ||
70 | |||
71 | addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do | ||
72 | cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) | ||
73 | whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do | ||
74 | x <- atomically $ readTBMQueue newchans | ||
75 | case x of | ||
76 | Just (newClientThread,clientQ) -> do -- Is clientQ input or output? | ||
77 | syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") | ||
78 | atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) | ||
79 | Nothing -> doNothing | ||
80 | threadDelay qdelay | ||
81 | |||
82 | purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do | ||
83 | map <- atomically $ readTVar currentClients | ||
84 | let k = M.keys map | ||
85 | e = M.elems map | ||
86 | closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e | ||
87 | let f False a b = [(a,b)] -- still open | ||
88 | f True _ _ = [] -- closed | ||
89 | closing = filter fst (zip closedClients k) | ||
90 | if (not . null $ closing) | ||
91 | then syslog Notice ("Closing connections: " ++ show closing) | ||
92 | else doNothing | ||
93 | atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e | ||
94 | threadDelay qdelay | ||
95 | |||
96 | handleMessage hdl outq = do | ||
97 | line <- B.hGetLine hdl | ||
98 | tid <- myThreadId | ||
99 | case (decode line :: Either String KikiDMessage) of | ||
100 | Right _ -> | ||
101 | syslog Notice ("Message decoded on thread=" <> show tid) | ||
102 | Left str -> do | ||
103 | syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) | ||
104 | syslog Notice str | ||
105 | |||
106 | consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) | ||
107 | -- TODO: Do more here... | ||