summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2015-06-21 07:36:46 -0400
committerJames Crayne <jim.crayne@gmail.com>2015-06-21 15:35:26 -0400
commitd420aefe0b14081b00d4655f9e8317903b5b02c3 (patch)
tree7b58c8a784d1ef23be05e62aaf0c8311353d57d5
parent75d4b3b8bc60ff1d7ccc3990833a4439c0af02bc (diff)
kikid: The Beginnings of the Kiki Daemon
-rw-r--r--KikiD/GetLine.hs18
-rw-r--r--KikiD/Message.hs18
-rw-r--r--KikiD/Multiplex.hs100
-rw-r--r--KikiD/PortServer.hs145
-rw-r--r--kiki.cabal18
-rw-r--r--kikid.hs107
6 files changed, 405 insertions, 1 deletions
diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs
new file mode 100644
index 0000000..8af5dc6
--- /dev/null
+++ b/KikiD/GetLine.hs
@@ -0,0 +1,18 @@
1module KikiD.GetLine where
2
3import Control.Monad
4import Data.Serialize
5import qualified Data.ByteString as BS
6import qualified Data.ByteString.Lazy as L
7import Data.Monoid
8import Data.Binary.Builder
9
10getLine :: Get BS.ByteString
11getLine = getWords empty
12 where
13 getWords b = do
14 w <- getWord8
15 let x = singleton w
16 if (w == 10 || w == 0)
17 then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x
18 else getWords (b <> x)
diff --git a/KikiD/Message.hs b/KikiD/Message.hs
new file mode 100644
index 0000000..c4903cc
--- /dev/null
+++ b/KikiD/Message.hs
@@ -0,0 +1,18 @@
1module KikiD.Message where
2
3import Data.Serialize
4import qualified KikiD.GetLine
5import qualified Data.ByteString.Char8 as B
6import Data.Monoid
7import Text.Read
8
9data KikiDMessage = TODO deriving (Show,Read)
10
11instance Serialize KikiDMessage where
12 put = putByteString . B.pack . show
13 get = do
14 x <- KikiD.GetLine.getLine
15 case (readEither (B.unpack x) :: Either String KikiDMessage) of
16 Right m -> return m
17 Left er -> fail ("PARSE ERROR: " <> er)
18
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs
new file mode 100644
index 0000000..4a31127
--- /dev/null
+++ b/KikiD/Multiplex.hs
@@ -0,0 +1,100 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE DeriveGeneric #-}
7{-# LANGUAGE BangPatterns #-}
8module KikiD.Multiplex where
9
10import System.IO
11import qualified Data.ByteString.Char8 as B
12import Data.Monoid
13import Control.Concurrent.STM
14import Data.Map.Strict as M
15import Control.Monad
16import Control.Concurrent
17import qualified Data.Binary as Bin
18import Control.Concurrent.STM.TBMQueue
19import Control.Monad.Loops
20import Data.List
21import Data.Maybe
22
23-- | pipeTransHookMicroseconds
24--
25-- This function indefinitely reads the @fromChan@ queue and applies
26-- the function @translate@ to the contents before passing it on to the
27-- @toChan@ queue. The @triggerAction@ is performed on the message prior
28-- to the translation. The @fromChan@ queue is checked every @micros@
29-- microseconds from the last emptying.
30--
31-- To terminate the thread, close @fromChan@ queue.
32--
33pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO ()
34pipeTransHookMicroseconds fromChan toChan micros translate triggerAction =
35 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
36 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
37 msg <- atomically $ readTBMQueue fromChan
38 case msg of
39 Just m' -> do
40 x <- triggerAction m'
41 case translate x m' of
42 Just m -> atomically $ writeTBMQueue toChan m
43 _ -> return ()
44 _ -> return ()
45 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
46
47pipeTransHook fromChan toChan translate triggerAction =
48 pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction
49
50pipeTrans fromChan toChan translate =
51 pipeTransHook fromChan toChan translate (void . return)
52
53pipeHook fromChan toChan triggerAction =
54 pipeTransHook fromChan toChan id triggerAction
55
56pipeQueue fromChan toChan =
57 pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return)
58
59teePipeQueueMicroseconds fromChan toChan1 toChan2 micros =
60 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
61 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
62 msg <- atomically $ readTBMQueue fromChan
63 case msg of
64 Just m -> do
65 atomically $ writeTBMQueue toChan1 m
66 atomically $ writeTBMQueue toChan2 m
67 _ -> return ()
68 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
69
70teePipeQueue fromChan toChan1 toChan2 =
71 teePipeQueueMicroseconds fromChan toChan1 toChan2 5000
72
73
74-- Deprecated: Use consumeQueueMicroseconds
75-- TODO: Remove this
76withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do
77 whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do
78 t <- atomically $ readTBMQueue fromChan
79 case t of
80 Just x -> action x
81 Nothing -> return ()
82 threadDelay delay
83
84{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-}
85withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action
86{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-}
87
88-- | consumeQueueMicroseconds
89-- (as of version 1.0.4)
90--
91-- Continously run the provided action on items
92-- from the provided queue. Delay for provided
93-- microseconds each time the queue is emptied.
94consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do
95 whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do
96 x <- atomically $ readTBMQueue q
97 case x of
98 Just s -> action s
99 Nothing -> return ()
100 threadDelay micros
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs
new file mode 100644
index 0000000..31101a7
--- /dev/null
+++ b/KikiD/PortServer.hs
@@ -0,0 +1,145 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE BangPatterns #-}
4module KikiD.PortServer (createTCPPortListener) where
5
6import qualified Data.ByteString.Char8 as B
7import Network.Socket hiding (send)
8import Network.Socket.ByteString
9import Data.Monoid ((<>))
10--import qualified Network.IRC as IRC
11import Network.HTTP.Base (catchIO,catchIO_)
12import Control.Concurrent.STM
13import Control.Concurrent
14import Control.Monad
15import Control.Monad.Fix
16import System.IO
17import System.Directory (getAppUserDataDirectory)
18import Text.Printf (printf)
19import System.FilePath ((</>))
20import Control.Concurrent.STM.TBMQueue
21import Control.Monad.Loops
22import KikiD.Multiplex (pipeTransHookMicroseconds)
23import Control.Exception
24import Control.Concurrent.Async
25import Data.Serialize
26
27import Control.Arrow (second)
28--import qualified Merv.GetLine as MG
29
30{-instance Serialize IRC.Message where
31 put = putByteString . IRC.encode
32 get = do
33 x <- MG.getLine
34 case IRC.decode x of
35 Just x -> return x
36 Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'")
37
38
39createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int
40 -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO ()
41createIRCPortListener port name delay qsize maxconns postNewTChans outq =
42 createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact
43
44-}
45
46createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int
47 -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
48 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
49createTCPPortListener port name delay qsize maxconns postNewTChans outq react =
50 bracket
51 -- aquire resources
52 (socket AF_INET Stream 0)
53
54 -- release resources
55 sClose
56
57 -- operate on resources
58 (\sock -> do
59 -- make socket immediately reusable - eases debugging.
60 setSocketOption sock ReuseAddr 1
61 -- listen on TCP port 4242
62 bindSocket sock (SockAddrInet port iNADDR_ANY)
63 -- allow a maximum of 15 outstanding connections
64 listen sock maxconns
65 sockAcceptLoop sock name delay qsize postNewTChans outq react
66 )
67
68sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
69 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
70sockAcceptLoop listenSock name delay qsize postNewTChans outq react =
71 whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
72 -- accept one connection and handle it
73 conn@(sock,_) <- accept listenSock
74 async $ bracket (do
75 -- acquire resources
76 hdl <- socketToHandle sock ReadWriteMode
77 q <- atomically $ newTBMQueue qsize
78 thisChildOut <- atomically $ newTBMQueue qsize
79 async1 <- async (runConn hdl name q thisChildOut delay react)
80 async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
81 (\() -> Just) -- no translation on outgoing
82 (\m -> return ()))
83 return (hdl,q,thisChildOut,(async1,async2))
84 )
85 -- release resources
86 (\(hdl,q,thisChildOut,(async1,async2)) -> do
87 cancel async1
88 cancel async2
89 atomically $ closeTBMQueue q
90 atomically $ closeTBMQueue thisChildOut
91 hClose hdl
92 )
93 -- run opration on async
94 (\(_,q,_,(async1,async2)) -> do
95 let tid = asyncThreadId async1
96 atomically $ writeTBMQueue postNewTChans (tid,q)
97 --link2 async1 async2 -- Do I need this?
98 waitBoth async1 async2
99 )
100
101runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int
102 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
103runConn hdl name q outq delay react = do
104 --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
105 -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
106 -- OnConnect Message...
107
108 race_
109 -- continuously read q and output to handle (socket)
110 -- to terminate thread, close q
111 (do
112 let pending = fmap not (atomically $ isEmptyTBMQueue q)
113 closed = atomically . isClosedTBMQueue $ q
114 whileM_ (fmap not closed) $ do
115 whileM_ pending $ do
116 m <- atomically (readTBMQueue q)
117 case m of
118 Just m -> B.hPutStrLn hdl (encode m)
119 -- Nothing means the Queue is closed and empty, so dont loop
120 Nothing -> return ()
121 threadDelay delay
122 --B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
123 )
124
125 -- continuously input from handle and
126 -- send to provided outq
127 (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )
128
129
130{-
131ircReact hdl outq = do
132 line <- B.hGetLine hdl
133 -- debugging
134 dir <- getAppUserDataDirectory "merv"
135 tid <- myThreadId
136 let bQuit = (B.isPrefixOf "/quit") line
137 appendFile (dir </> "xdebug")
138 (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line))
139 -- end debugging
140 case IRC.decode line of
141 Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq
142 Just m -> atomically $ writeTBMQueue outq m
143 Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq
144 _ -> return undefined
145-}
diff --git a/kiki.cabal b/kiki.cabal
index f811743..2632e6d 100644
--- a/kiki.cabal
+++ b/kiki.cabal
@@ -13,7 +13,9 @@ build-type: Simple
13 13
14Executable kiki 14Executable kiki
15 Main-is: kiki.hs 15 Main-is: kiki.hs
16 Build-Depends: base -any, directory -any, 16 -- base >=4.6 due to use of readEither in KikiD.Message
17 Build-Depends: base >=4.6.0.0,
18 directory -any,
17 openpgp-util -any, 19 openpgp-util -any,
18 crypto-pubkey (>=0.2.3), cryptohash -any, 20 crypto-pubkey (>=0.2.3), cryptohash -any,
19 crypto-pubkey-types -any, 21 crypto-pubkey-types -any,
@@ -31,5 +33,19 @@ Executable hosts
31 Main-is: hosts.hs 33 Main-is: hosts.hs
32 c-sources: dotlock.c 34 c-sources: dotlock.c
33 35
36Executable kikid
37 Main-is: kikid.hs
38 Build-Depends: base -any,
39 --kiki >=0.0.3,
40 hdaemonize >= 0.5,
41 hsyslog -any,
42 async >= 2.0.0,
43 stm-chans >= 2.0.0,
44 network >= 2.4 && < 3.0,
45 monad-loops -any,
46 HTTP -any,
47 stm >= 2.3,
48 cereal -any
49
34library 50library
35 exposed-modules: KeyRing 51 exposed-modules: KeyRing
diff --git a/kikid.hs b/kikid.hs
new file mode 100644
index 0000000..31426a3
--- /dev/null
+++ b/kikid.hs
@@ -0,0 +1,107 @@
1{-# LANGUAGE OverloadedStrings #-}
2
3import System.Posix.Daemonize
4import Control.Concurrent
5import System.Posix.Syslog
6import System.Posix.Signals
7import System.Posix.User (getEffectiveUserID)
8import KikiD.PortServer
9import KikiD.Multiplex
10import KikiD.Message
11import Control.Concurrent.STM
12import Control.Concurrent.STM.TBMQueue
13import Control.Concurrent.Async
14import Control.Monad
15import Control.Monad.Loops
16import Control.Exception
17import Data.Monoid
18import qualified Data.ByteString.Char8 as B
19import Data.Serialize
20import qualified Data.Map as M
21
22-- TODO: Set this in config file
23port = 9800
24max_conns = 100
25qsize = 20
26qdelay = 5*10^5
27
28doNothing = return ()
29
30main = do
31 me <- getEffectiveUserID
32 if me == 0 then
33 serviced simpleDaemon { privilegedAction = startupAsRoot
34 , program = kikidMain
35 , user = Just "root", group = Just "root" }
36 else putStrLn "Kiki Daemon must be run as root."
37
38startupAsRoot = syslog Notice "kikid Started."
39
40kikidMain _ = do
41 refreshFlag <- atomically $ newTVar True
42 whileM_ (atomically $ readTVar refreshFlag) $ do
43 incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
44 newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
45 currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
46 let timeToQuit = atomically . isClosedTBMQueue $ newchans
47 installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
48 atomically $ writeTVar refreshFlag False
49 atomically $ closeTBMQueue newchans
50 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
51 installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
52 atomically $ closeTBMQueue newchans
53 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
54 (_,ex) <- join . fmap waitAnyCatch $ mapM async
55 [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
56 , addOpenConnections newchans currentClients
57 , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
58 , purgeClosedConnections timeToQuit currentClients
59 ]
60 case (ex::Either SomeException ()) of
61 Left e -> syslog Notice ("Exception: " <> show e)
62 Right _ -> doNothing
63
64 atomically $ closeTBMQueue newchans
65 atomically $ closeTBMQueue incomming
66
67data ClientState = CState {cs_queue :: TBMQueue KikiDMessage}
68type ClientID = ThreadId
69threadIdToClient = id
70
71addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
72 cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
73 whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
74 x <- atomically $ readTBMQueue newchans
75 case x of
76 Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
77 syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
78 atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
79 Nothing -> doNothing
80 threadDelay qdelay
81
82purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
83 map <- atomically $ readTVar currentClients
84 let k = M.keys map
85 e = M.elems map
86 closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
87 let f False a b = [(a,b)] -- still open
88 f True _ _ = [] -- closed
89 closing = filter fst (zip closedClients k)
90 if (not . null $ closing)
91 then syslog Notice ("Closing connections: " ++ show closing)
92 else doNothing
93 atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
94 threadDelay qdelay
95
96handleMessage hdl outq = do
97 line <- B.hGetLine hdl
98 tid <- myThreadId
99 case (decode line :: Either String KikiDMessage) of
100 Right _ ->
101 syslog Notice ("Message decoded on thread=" <> show tid)
102 Left str -> do
103 syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
104 syslog Notice str
105
106consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
107-- TODO: Do more here...