summaryrefslogtreecommitdiff
path: root/kikid.hs
diff options
context:
space:
mode:
Diffstat (limited to 'kikid.hs')
-rw-r--r--kikid.hs107
1 files changed, 107 insertions, 0 deletions
diff --git a/kikid.hs b/kikid.hs
new file mode 100644
index 0000000..31426a3
--- /dev/null
+++ b/kikid.hs
@@ -0,0 +1,107 @@
1{-# LANGUAGE OverloadedStrings #-}
2
3import System.Posix.Daemonize
4import Control.Concurrent
5import System.Posix.Syslog
6import System.Posix.Signals
7import System.Posix.User (getEffectiveUserID)
8import KikiD.PortServer
9import KikiD.Multiplex
10import KikiD.Message
11import Control.Concurrent.STM
12import Control.Concurrent.STM.TBMQueue
13import Control.Concurrent.Async
14import Control.Monad
15import Control.Monad.Loops
16import Control.Exception
17import Data.Monoid
18import qualified Data.ByteString.Char8 as B
19import Data.Serialize
20import qualified Data.Map as M
21
22-- TODO: Set this in config file
23port = 9800
24max_conns = 100
25qsize = 20
26qdelay = 5*10^5
27
28doNothing = return ()
29
30main = do
31 me <- getEffectiveUserID
32 if me == 0 then
33 serviced simpleDaemon { privilegedAction = startupAsRoot
34 , program = kikidMain
35 , user = Just "root", group = Just "root" }
36 else putStrLn "Kiki Daemon must be run as root."
37
38startupAsRoot = syslog Notice "kikid Started."
39
40kikidMain _ = do
41 refreshFlag <- atomically $ newTVar True
42 whileM_ (atomically $ readTVar refreshFlag) $ do
43 incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage)
44 newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage))
45 currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState))
46 let timeToQuit = atomically . isClosedTBMQueue $ newchans
47 installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.")
48 atomically $ writeTVar refreshFlag False
49 atomically $ closeTBMQueue newchans
50 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
51 installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..")
52 atomically $ closeTBMQueue newchans
53 atomically $ closeTBMQueue incomming)) (Just fullSignalSet)
54 (_,ex) <- join . fmap waitAnyCatch $ mapM async
55 [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage
56 , addOpenConnections newchans currentClients
57 , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients)
58 , purgeClosedConnections timeToQuit currentClients
59 ]
60 case (ex::Either SomeException ()) of
61 Left e -> syslog Notice ("Exception: " <> show e)
62 Right _ -> doNothing
63
64 atomically $ closeTBMQueue newchans
65 atomically $ closeTBMQueue incomming
66
67data ClientState = CState {cs_queue :: TBMQueue KikiDMessage}
68type ClientID = ThreadId
69threadIdToClient = id
70
71addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do
72 cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState)
73 whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do
74 x <- atomically $ readTBMQueue newchans
75 case x of
76 Just (newClientThread,clientQ) -> do -- Is clientQ input or output?
77 syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?")
78 atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ))
79 Nothing -> doNothing
80 threadDelay qdelay
81
82purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do
83 map <- atomically $ readTVar currentClients
84 let k = M.keys map
85 e = M.elems map
86 closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e
87 let f False a b = [(a,b)] -- still open
88 f True _ _ = [] -- closed
89 closing = filter fst (zip closedClients k)
90 if (not . null $ closing)
91 then syslog Notice ("Closing connections: " ++ show closing)
92 else doNothing
93 atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e
94 threadDelay qdelay
95
96handleMessage hdl outq = do
97 line <- B.hGetLine hdl
98 tid <- myThreadId
99 case (decode line :: Either String KikiDMessage) of
100 Right _ ->
101 syslog Notice ("Message decoded on thread=" <> show tid)
102 Left str -> do
103 syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid)
104 syslog Notice str
105
106consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg)
107-- TODO: Do more here...