diff options
Diffstat (limited to 'kikid.hs')
-rw-r--r-- | kikid.hs | 112 |
1 files changed, 0 insertions, 112 deletions
diff --git a/kikid.hs b/kikid.hs deleted file mode 100644 index 059a5df..0000000 --- a/kikid.hs +++ /dev/null | |||
@@ -1,112 +0,0 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE DoAndIfThenElse #-} | ||
3 | |||
4 | import System.Posix.Daemonize | ||
5 | import Control.Concurrent | ||
6 | import System.Posix.Syslog | ||
7 | import System.Posix.Signals | ||
8 | import System.Posix.User (getEffectiveUserID) | ||
9 | |||
10 | import Control.Concurrent.STM | ||
11 | import Control.Concurrent.STM.TBMQueue | ||
12 | import Control.Concurrent.Async | ||
13 | import Control.Monad | ||
14 | import Control.Monad.Loops | ||
15 | import Control.Exception | ||
16 | import Data.Monoid | ||
17 | import qualified Data.ByteString.Char8 as B | ||
18 | --import Data.Serialize | ||
19 | import qualified Data.Map as M | ||
20 | import qualified Data.Bytes.Serial as Bytes | ||
21 | import qualified Data.Bytes.Get as Bytes | ||
22 | import qualified Data.Bytes.Put as Bytes | ||
23 | |||
24 | import KikiD.PortServer | ||
25 | import KikiD.Multiplex | ||
26 | import KikiD.Message | ||
27 | import KikiD.ClientState | ||
28 | |||
29 | -- TODO: Set this in config file | ||
30 | port = 9800 | ||
31 | max_conns = 100 | ||
32 | qsize = 20 | ||
33 | qdelay = 5*10^5 | ||
34 | |||
35 | doNothing = return () | ||
36 | |||
37 | main = do | ||
38 | me <- getEffectiveUserID | ||
39 | if me == 0 then | ||
40 | serviced simpleDaemon { privilegedAction = startupAsRoot | ||
41 | , program = kikidMain | ||
42 | , user = Just "root", group = Just "root" } | ||
43 | else putStrLn "Kiki Daemon must be run as root." | ||
44 | |||
45 | startupAsRoot = syslog Notice "kikid Started." | ||
46 | |||
47 | kikidMain _ = do | ||
48 | refreshFlag <- atomically $ newTVar True | ||
49 | whileM_ (atomically $ readTVar refreshFlag) $ do | ||
50 | incomming <- newTBMQueueIO qsize :: IO (TBMQueue KikiDMessage) | ||
51 | newchans <- newTBMQueueIO qsize :: IO (TBMQueue (ThreadId, TBMQueue KikiDMessage)) | ||
52 | currentClients <- atomically $ newTVar (M.empty) :: IO (TVar (M.Map clientID ClientState)) | ||
53 | let timeToQuit = atomically . isClosedTBMQueue $ newchans | ||
54 | installHandler sigTERM (Catch (do syslog Notice ("Caught sigTERM: Shutting down.") | ||
55 | atomically $ writeTVar refreshFlag False | ||
56 | atomically $ closeTBMQueue newchans | ||
57 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
58 | installHandler sigHUP (Catch (do syslog Notice ("Caught sigHUP: Refreshing..") | ||
59 | atomically $ closeTBMQueue newchans | ||
60 | atomically $ closeTBMQueue incomming)) (Just fullSignalSet) | ||
61 | (_,ex) <- join . fmap waitAnyCatch $ mapM async | ||
62 | [ createTCPPortListener port "KikiD" qdelay qsize max_conns newchans incomming handleMessage | ||
63 | , addOpenConnections newchans currentClients | ||
64 | , consumeQueueMicroseconds incomming (qdelay `div` 2) (consumeMessage currentClients) | ||
65 | , purgeClosedConnections timeToQuit currentClients | ||
66 | ] | ||
67 | case (ex::Either SomeException ()) of | ||
68 | Left e -> syslog Notice ("Exception: " <> show e) | ||
69 | Right _ -> doNothing | ||
70 | |||
71 | atomically $ closeTBMQueue newchans | ||
72 | atomically $ closeTBMQueue incomming | ||
73 | |||
74 | addOpenConnections newchans currentClients = whileM_ (atomically . fmap not $ isClosedTBMQueue newchans) $ do | ||
75 | cliMap <- atomically $ readTVar currentClients :: IO (M.Map ClientID ClientState) | ||
76 | whileM_ (atomically . fmap not $ isEmptyTBMQueue newchans) $ do | ||
77 | x <- atomically $ readTBMQueue newchans | ||
78 | case x of | ||
79 | Just (newClientThread,clientQ) -> do -- Is clientQ input or output? | ||
80 | syslog Notice ("New connection, thread= " <> show newClientThread <> ", TODO: authenticate?") | ||
81 | atomically $ modifyTVar currentClients (M.insert (threadIdToClient newClientThread) (CState clientQ)) | ||
82 | Nothing -> doNothing | ||
83 | threadDelay qdelay | ||
84 | |||
85 | purgeClosedConnections quit currentClients = whileM_ (fmap not quit) $ do | ||
86 | map <- atomically $ readTVar currentClients | ||
87 | let k = M.keys map | ||
88 | e = M.elems map | ||
89 | closedClients <- mapM (\(CState q) -> atomically $ isClosedTBMQueue q) e | ||
90 | let f False a b = [(a,b)] -- still open | ||
91 | f True _ _ = [] -- closed | ||
92 | closing = filter fst (zip closedClients k) | ||
93 | if (not . null $ closing) | ||
94 | then syslog Notice ("Closing connections: " ++ show closing) | ||
95 | else doNothing | ||
96 | atomically . writeTVar currentClients . M.fromList . concat $ zipWith3 f closedClients k e | ||
97 | threadDelay qdelay | ||
98 | |||
99 | handleMessage hdl outq = do | ||
100 | line <- B.hGetLine hdl | ||
101 | tid <- myThreadId | ||
102 | case (Bytes.runGetS Bytes.deserialize line :: Either String KikiDMessage) of | ||
103 | Right msg -> do | ||
104 | syslog Notice ("Message decoded on thread=" <> show tid) | ||
105 | syslog Notice ("Message: " <> show msg) | ||
106 | Left str -> do | ||
107 | syslog Notice ("ERROR: " <> show line) | ||
108 | syslog Notice ("ERROR: Unable to decode message on thread=" <> show tid) | ||
109 | syslog Notice ("ERROR: " ++ str) | ||
110 | |||
111 | consumeMessage currentClients msg = void $ syslog Notice ("Recieved Message: " ++ show msg) | ||
112 | -- TODO: Do more here... | ||