diff options
author | James Crayne <jim.crayne@gmail.com> | 2016-04-28 17:25:01 -0400 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2016-04-28 17:25:01 -0400 |
commit | 4ec9cc5e6e1c71184c0537fb2fbd4387f27b3ac2 (patch) | |
tree | 67b144c4d839cd617cb4a9233b78802ad315f63c /KikiD | |
parent | e7db95eb413a311dbeddd8bf2474f3710b9258c0 (diff) |
remove kikid, moved to separate repo
Diffstat (limited to 'KikiD')
-rw-r--r-- | KikiD/ClientState.hs | 14 | ||||
-rw-r--r-- | KikiD/Message.hs | 42 | ||||
-rw-r--r-- | KikiD/Multiplex.hs | 100 | ||||
-rw-r--r-- | KikiD/PortServer.hs | 114 |
4 files changed, 0 insertions, 270 deletions
diff --git a/KikiD/ClientState.hs b/KikiD/ClientState.hs deleted file mode 100644 index a80a392..0000000 --- a/KikiD/ClientState.hs +++ /dev/null | |||
@@ -1,14 +0,0 @@ | |||
1 | module KikiD.ClientState where | ||
2 | |||
3 | import KikiD.Message | ||
4 | import Control.Concurrent.STM.TBMQueue | ||
5 | import Control.Concurrent | ||
6 | |||
7 | data ClientState = CState {cliQueue :: TBMQueue KikiDMessage} | ||
8 | |||
9 | mkClient = CState | ||
10 | { cliQueue = error "ERROR CState: cliQueue parameter is required" | ||
11 | } | ||
12 | |||
13 | type ClientID = ThreadId | ||
14 | threadIdToClient = id | ||
diff --git a/KikiD/Message.hs b/KikiD/Message.hs deleted file mode 100644 index efefdc6..0000000 --- a/KikiD/Message.hs +++ /dev/null | |||
@@ -1,42 +0,0 @@ | |||
1 | {-# LANGUAGE DoAndIfThenElse #-} | ||
2 | module KikiD.Message where | ||
3 | |||
4 | import Data.Serialize as Cereal | ||
5 | import qualified Data.ByteString.Char8 as B | ||
6 | import Data.Monoid | ||
7 | import Text.Read | ||
8 | import Data.Char (ord,chr) | ||
9 | import Control.Monad | ||
10 | import Data.Bytes.Serial as R | ||
11 | import Data.Bytes.Put as Put | ||
12 | import Data.Bytes.Get as Get | ||
13 | import Codec.LineReady | ||
14 | import Control.Monad.Loops | ||
15 | import Data.Word | ||
16 | |||
17 | data KikiDMessage = TODO deriving (Show,Read) | ||
18 | |||
19 | instance Serialize KikiDMessage where | ||
20 | put m = mapM_ (Cereal.putWord8 . fromIntegral . ord) "TO\nO" | ||
21 | -- putByteString . B.pack $ show m ++ "\n" | ||
22 | get = do | ||
23 | t <- Cereal.getWord8 | ||
24 | o <- Cereal.getWord8 | ||
25 | d <- Cereal.getWord8 | ||
26 | o <- Cereal.getWord8 | ||
27 | let s = map (chr . fromIntegral) [t,o,d,o] | ||
28 | if "TO\nO" == s | ||
29 | then return TODO | ||
30 | else fail ("Could not decode message: " ++ show s) | ||
31 | |||
32 | instance Serial KikiDMessage where | ||
33 | serialize m = Put.putByteString . toLineReady . Cereal.encode $ m | ||
34 | deserialize = do | ||
35 | xs <- unfoldM $ do | ||
36 | flag <- Get.isEmpty | ||
37 | if flag then return Nothing else do | ||
38 | c <- fmap (chr . fromIntegral) Get.getWord8 | ||
39 | if (c == '\n') then return Nothing else return (Just c) | ||
40 | case (Cereal.decode . fromLineReady $ B.pack xs) of | ||
41 | Left str -> fail str | ||
42 | Right x -> return x | ||
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs deleted file mode 100644 index 4a31127..0000000 --- a/KikiD/Multiplex.hs +++ /dev/null | |||
@@ -1,100 +0,0 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE TupleSections #-} | ||
4 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
6 | {-# LANGUAGE DeriveGeneric #-} | ||
7 | {-# LANGUAGE BangPatterns #-} | ||
8 | module KikiD.Multiplex where | ||
9 | |||
10 | import System.IO | ||
11 | import qualified Data.ByteString.Char8 as B | ||
12 | import Data.Monoid | ||
13 | import Control.Concurrent.STM | ||
14 | import Data.Map.Strict as M | ||
15 | import Control.Monad | ||
16 | import Control.Concurrent | ||
17 | import qualified Data.Binary as Bin | ||
18 | import Control.Concurrent.STM.TBMQueue | ||
19 | import Control.Monad.Loops | ||
20 | import Data.List | ||
21 | import Data.Maybe | ||
22 | |||
23 | -- | pipeTransHookMicroseconds | ||
24 | -- | ||
25 | -- This function indefinitely reads the @fromChan@ queue and applies | ||
26 | -- the function @translate@ to the contents before passing it on to the | ||
27 | -- @toChan@ queue. The @triggerAction@ is performed on the message prior | ||
28 | -- to the translation. The @fromChan@ queue is checked every @micros@ | ||
29 | -- microseconds from the last emptying. | ||
30 | -- | ||
31 | -- To terminate the thread, close @fromChan@ queue. | ||
32 | -- | ||
33 | pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO () | ||
34 | pipeTransHookMicroseconds fromChan toChan micros translate triggerAction = | ||
35 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
36 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
37 | msg <- atomically $ readTBMQueue fromChan | ||
38 | case msg of | ||
39 | Just m' -> do | ||
40 | x <- triggerAction m' | ||
41 | case translate x m' of | ||
42 | Just m -> atomically $ writeTBMQueue toChan m | ||
43 | _ -> return () | ||
44 | _ -> return () | ||
45 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
46 | |||
47 | pipeTransHook fromChan toChan translate triggerAction = | ||
48 | pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction | ||
49 | |||
50 | pipeTrans fromChan toChan translate = | ||
51 | pipeTransHook fromChan toChan translate (void . return) | ||
52 | |||
53 | pipeHook fromChan toChan triggerAction = | ||
54 | pipeTransHook fromChan toChan id triggerAction | ||
55 | |||
56 | pipeQueue fromChan toChan = | ||
57 | pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return) | ||
58 | |||
59 | teePipeQueueMicroseconds fromChan toChan1 toChan2 micros = | ||
60 | whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do | ||
61 | whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do | ||
62 | msg <- atomically $ readTBMQueue fromChan | ||
63 | case msg of | ||
64 | Just m -> do | ||
65 | atomically $ writeTBMQueue toChan1 m | ||
66 | atomically $ writeTBMQueue toChan2 m | ||
67 | _ -> return () | ||
68 | threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads | ||
69 | |||
70 | teePipeQueue fromChan toChan1 toChan2 = | ||
71 | teePipeQueueMicroseconds fromChan toChan1 toChan2 5000 | ||
72 | |||
73 | |||
74 | -- Deprecated: Use consumeQueueMicroseconds | ||
75 | -- TODO: Remove this | ||
76 | withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do | ||
77 | whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do | ||
78 | t <- atomically $ readTBMQueue fromChan | ||
79 | case t of | ||
80 | Just x -> action x | ||
81 | Nothing -> return () | ||
82 | threadDelay delay | ||
83 | |||
84 | {-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-} | ||
85 | withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action | ||
86 | {-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-} | ||
87 | |||
88 | -- | consumeQueueMicroseconds | ||
89 | -- (as of version 1.0.4) | ||
90 | -- | ||
91 | -- Continously run the provided action on items | ||
92 | -- from the provided queue. Delay for provided | ||
93 | -- microseconds each time the queue is emptied. | ||
94 | consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do | ||
95 | whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do | ||
96 | x <- atomically $ readTBMQueue q | ||
97 | case x of | ||
98 | Just s -> action s | ||
99 | Nothing -> return () | ||
100 | threadDelay micros | ||
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs deleted file mode 100644 index b42e340..0000000 --- a/KikiD/PortServer.hs +++ /dev/null | |||
@@ -1,114 +0,0 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE BangPatterns #-} | ||
4 | module KikiD.PortServer (createTCPPortListener) where | ||
5 | |||
6 | import qualified Data.ByteString.Char8 as B | ||
7 | import Network.Socket hiding (send) | ||
8 | import Network.Socket.ByteString | ||
9 | import Data.Monoid ((<>)) | ||
10 | --import qualified Network.IRC as IRC | ||
11 | import Network.HTTP.Base (catchIO,catchIO_) | ||
12 | import Control.Concurrent.STM | ||
13 | import Control.Concurrent | ||
14 | import Control.Monad | ||
15 | import Control.Monad.Fix | ||
16 | import System.IO | ||
17 | import System.Directory (getAppUserDataDirectory) | ||
18 | import Text.Printf (printf) | ||
19 | import System.FilePath ((</>)) | ||
20 | import Control.Concurrent.STM.TBMQueue | ||
21 | import Control.Monad.Loops | ||
22 | import KikiD.Multiplex (pipeTransHookMicroseconds) | ||
23 | import Control.Exception | ||
24 | import Control.Concurrent.Async | ||
25 | import Data.Bytes.Serial as R | ||
26 | import Data.Bytes.Put as Put | ||
27 | |||
28 | import Control.Arrow (second) | ||
29 | |||
30 | |||
31 | createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int | ||
32 | -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
33 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
34 | createTCPPortListener port name delay qsize maxconns postNewTChans outq react = | ||
35 | bracket | ||
36 | -- aquire resources | ||
37 | (socket AF_INET Stream 0) | ||
38 | |||
39 | -- release resources | ||
40 | sClose | ||
41 | |||
42 | -- operate on resources | ||
43 | (\sock -> do | ||
44 | -- make socket immediately reusable - eases debugging. | ||
45 | setSocketOption sock ReuseAddr 1 | ||
46 | -- listen on TCP port 4242 | ||
47 | bindSocket sock (SockAddrInet port iNADDR_ANY) | ||
48 | -- allow a maximum of 15 outstanding connections | ||
49 | listen sock maxconns | ||
50 | sockAcceptLoop sock name delay qsize postNewTChans outq react | ||
51 | ) | ||
52 | |||
53 | sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
54 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
55 | sockAcceptLoop listenSock name delay qsize postNewTChans outq react = | ||
56 | whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do | ||
57 | -- accept one connection and handle it | ||
58 | conn@(sock,_) <- accept listenSock | ||
59 | async $ bracket (do | ||
60 | -- acquire resources | ||
61 | hdl <- socketToHandle sock ReadWriteMode | ||
62 | q <- atomically $ newTBMQueue qsize | ||
63 | thisChildOut <- atomically $ newTBMQueue qsize | ||
64 | async1 <- async (runConn hdl name q thisChildOut delay react) | ||
65 | async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 | ||
66 | (\() -> Just) -- no translation on outgoing | ||
67 | (\m -> return ())) | ||
68 | return (hdl,q,thisChildOut,(async1,async2)) | ||
69 | ) | ||
70 | -- release resources | ||
71 | (\(hdl,q,thisChildOut,(async1,async2)) -> do | ||
72 | cancel async1 | ||
73 | cancel async2 | ||
74 | atomically $ closeTBMQueue q | ||
75 | atomically $ closeTBMQueue thisChildOut | ||
76 | hClose hdl | ||
77 | ) | ||
78 | -- run opration on async | ||
79 | (\(_,q,_,(async1,async2)) -> do | ||
80 | let tid = asyncThreadId async1 | ||
81 | atomically $ writeTBMQueue postNewTChans (tid,q) | ||
82 | --link2 async1 async2 -- Do I need this? | ||
83 | waitBoth async1 async2 | ||
84 | ) | ||
85 | |||
86 | runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int | ||
87 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
88 | runConn hdl name q outq delay react = do | ||
89 | --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
90 | -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
91 | -- OnConnect Message... | ||
92 | |||
93 | race_ | ||
94 | -- continuously read q and output to handle (socket) | ||
95 | -- to terminate thread, close q | ||
96 | (do | ||
97 | let pending = fmap not (atomically $ isEmptyTBMQueue q) | ||
98 | closed = atomically . isClosedTBMQueue $ q | ||
99 | whileM_ (fmap not closed) $ do | ||
100 | whileM_ pending $ do | ||
101 | m <- atomically (readTBMQueue q) | ||
102 | case m of | ||
103 | Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m) | ||
104 | -- Nothing means the Queue is closed and empty, so dont loop | ||
105 | Nothing -> return () | ||
106 | threadDelay delay | ||
107 | --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) | ||
108 | ) | ||
109 | |||
110 | -- continuously input from handle and | ||
111 | -- send to provided outq | ||
112 | (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) | ||
113 | |||
114 | |||