diff options
Diffstat (limited to 'kikid.hs')
-rw-r--r-- | kikid.hs | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/kikid.hs b/kikid.hs new file mode 100644 index 0000000..31426a3 --- /dev/null +++ b/kikid.hs | |||
@@ -0,0 +1,107 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | |||
3 | import System.Posix.Daemonize | ||
4 | import Control.Concurrent | ||
5 | import System.Posix.Syslog | ||
6 | import System.Posix.Signals | ||
7 | import System.Posix.User (getEffectiveUserID) | ||
8 | import KikiD.PortServer | ||
9 | import KikiD.Multiplex | ||
10 | import KikiD.Message | ||
11 | import Control.Concurrent.STM | ||
12 | import Control.Concurrent.STM.TBMQueue | ||
13 | import Control.Concurrent.Async | ||
14 | import Control.Monad | ||
15 | import Control.Monad.Loops | ||
16 | import Control.Exception | ||
17 | import Data.Monoid | ||
18 | import qualified Data.ByteString.Char8 as B | ||
19 | import Data.Serialize | ||
20 | import qualified Data.Map as M | ||
21 | |||
22 | -- TODO: Set this in config file | ||
23 | port = 9800 | ||
24 | max_conns = 100 | ||
25 | qsize = 20 | ||
26 | qdelay = 5*10^5 | ||
27 | |||
28 | doNothing = return () | ||
29 | |||
30 | main = do | ||
31 | me <- getEffectiveUserID | ||
32 | if me == 0 then | ||
33 | serviced simpleDaemon { privilegedAction = startupAsRoot | ||
34 | , program = kikidMain | ||
35 | , user = Just "root", group = Just "root" } | ||
36 | else putStrLn "Kiki Daemon must be run as root." | ||
37 | |||
38 | startupAsRoot = syslog Notice "kikid Started." | ||
39 | |||
40 | kikidMain _ = do | ||
41 | refreshFlag <- atomically $ newTVar True | ||
42 | whileM_ (atomically $ readTVar refreshFlag) $ do | ||
43 | incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) | ||
44 | newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) | ||
45 | currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) | ||
46 | let timeToQuit = atomically . isClosedTBMQueue $ newchans | ||
47 | installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") | ||
48 | atomically $ writeTVar refreshFlag False | ||
49 | atomically $ closeTBMQueue newchans | ||
50 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
51 | installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") | ||
52 | atomically $ closeTBMQueue newchans | ||
53 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
54 | (_,ex) <- join . fmap waitAnyCatch $ mapM async | ||
55 | [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage | ||
56 | , addOpenConnections newchans currentClients | ||
57 | , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) | ||
58 | , purgeClosedConnections timeToQuit currentClients | ||
59 | ] | ||
60 | case (ex::Either SomeException ()) of | ||
61 | Left e -> syslog Notice ("Exception: " <> show e) | ||
62 | Right _ -> doNothing | ||
63 | |||
64 | atomically $ closeTBMQueue newchans | ||
65 | atomically $ closeTBMQueue incomming | ||
66 | |||
67 | data ClientState = CState {cs_queue :: TBMQueue KikiDMessage} | ||
68 | type ClientID = ThreadId | ||
69 | threadIdToClient = id | ||
70 | |||
71 | addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do | ||
72 | cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) | ||
73 | whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do | ||
74 | x <- atomically $ readTBMQueue newchans | ||
75 | case x of | ||
76 | Just (newClientThread,clientQ) -> do -- Is clientQ input or output? | ||
77 | syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") | ||
78 | atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) | ||
79 | Nothing -> doNothing | ||
80 | threadDelay qdelay | ||
81 | |||
82 | purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do | ||
83 | map <- atomically $ readTVar currentClients | ||
84 | let k = M.keys map | ||
85 | e = M.elems map | ||
86 | closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e | ||
87 | let f False a b = [(a,b)] -- still open | ||
88 | f True _ _ = [] -- closed | ||
89 | closing = filter fst (zip closedClients k) | ||
90 | if (not . null $ closing) | ||
91 | then syslog Notice ("Closing connections: " ++ show closing) | ||
92 | else doNothing | ||
93 | atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e | ||
94 | threadDelay qdelay | ||
95 | |||
96 | handleMessage hdl outq = do | ||
97 | line <- B.hGetLine hdl | ||
98 | tid <- myThreadId | ||
99 | case (decode line :: Either String KikiDMessage) of | ||
100 | Right _ -> | ||
101 | syslog Notice ("Message decoded on thread=" <> show tid) | ||
102 | Left str -> do | ||
103 | syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) | ||
104 | syslog Notice str | ||
105 | |||
106 | consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) | ||
107 | -- TODO: Do more here... | ||