summaryrefslogtreecommitdiff
path: root/KikiD
diff options
context:
space:
mode:
Diffstat (limited to 'KikiD')
-rw-r--r--KikiD/GetLine.hs18
-rw-r--r--KikiD/Message.hs18
-rw-r--r--KikiD/Multiplex.hs100
-rw-r--r--KikiD/PortServer.hs145
4 files changed, 281 insertions, 0 deletions
diff --git a/KikiD/GetLine.hs b/KikiD/GetLine.hs
new file mode 100644
index 0000000..8af5dc6
--- /dev/null
+++ b/KikiD/GetLine.hs
@@ -0,0 +1,18 @@
1module KikiD.GetLine where
2
3import Control.Monad
4import Data.Serialize
5import qualified Data.ByteString as BS
6import qualified Data.ByteString.Lazy as L
7import Data.Monoid
8import Data.Binary.Builder
9
10getLine :: Get BS.ByteString
11getLine = getWords empty
12 where
13 getWords b = do
14 w <- getWord8
15 let x = singleton w
16 if (w == 10 || w == 0)
17 then return $ BS.concat . L.toChunks . toLazyByteString $ b <> x
18 else getWords (b <> x)
diff --git a/KikiD/Message.hs b/KikiD/Message.hs
new file mode 100644
index 0000000..c4903cc
--- /dev/null
+++ b/KikiD/Message.hs
@@ -0,0 +1,18 @@
1module KikiD.Message where
2
3import Data.Serialize
4import qualified KikiD.GetLine
5import qualified Data.ByteString.Char8 as B
6import Data.Monoid
7import Text.Read
8
9data KikiDMessage = TODO deriving (Show,Read)
10
11instance Serialize KikiDMessage where
12 put = putByteString . B.pack . show
13 get = do
14 x <- KikiD.GetLine.getLine
15 case (readEither (B.unpack x) :: Either String KikiDMessage) of
16 Right m -> return m
17 Left er -> fail ("PARSE ERROR: " <> er)
18
diff --git a/KikiD/Multiplex.hs b/KikiD/Multiplex.hs
new file mode 100644
index 0000000..4a31127
--- /dev/null
+++ b/KikiD/Multiplex.hs
@@ -0,0 +1,100 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE TupleSections #-}
4{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE DeriveGeneric #-}
7{-# LANGUAGE BangPatterns #-}
8module KikiD.Multiplex where
9
10import System.IO
11import qualified Data.ByteString.Char8 as B
12import Data.Monoid
13import Control.Concurrent.STM
14import Data.Map.Strict as M
15import Control.Monad
16import Control.Concurrent
17import qualified Data.Binary as Bin
18import Control.Concurrent.STM.TBMQueue
19import Control.Monad.Loops
20import Data.List
21import Data.Maybe
22
23-- | pipeTransHookMicroseconds
24--
25-- This function indefinitely reads the @fromChan@ queue and applies
26-- the function @translate@ to the contents before passing it on to the
27-- @toChan@ queue. The @triggerAction@ is performed on the message prior
28-- to the translation. The @fromChan@ queue is checked every @micros@
29-- microseconds from the last emptying.
30--
31-- To terminate the thread, close @fromChan@ queue.
32--
33pipeTransHookMicroseconds :: TBMQueue a -> TBMQueue b -> Int -> (x -> a -> Maybe b) -> (a -> IO x) -> IO ()
34pipeTransHookMicroseconds fromChan toChan micros translate triggerAction =
35 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
36 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
37 msg <- atomically $ readTBMQueue fromChan
38 case msg of
39 Just m' -> do
40 x <- triggerAction m'
41 case translate x m' of
42 Just m -> atomically $ writeTBMQueue toChan m
43 _ -> return ()
44 _ -> return ()
45 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
46
47pipeTransHook fromChan toChan translate triggerAction =
48 pipeTransHookMicroseconds fromChan toChan 5000 translate triggerAction
49
50pipeTrans fromChan toChan translate =
51 pipeTransHook fromChan toChan translate (void . return)
52
53pipeHook fromChan toChan triggerAction =
54 pipeTransHook fromChan toChan id triggerAction
55
56pipeQueue fromChan toChan =
57 pipeTransHookMicroseconds fromChan toChan 5000 (\() -> Just) (void . return)
58
59teePipeQueueMicroseconds fromChan toChan1 toChan2 micros =
60 whileM_ (fmap not (atomically $ isClosedTBMQueue fromChan)) $ do
61 whileM_ (fmap not (atomically $ isEmptyTBMQueue fromChan)) $ do
62 msg <- atomically $ readTBMQueue fromChan
63 case msg of
64 Just m -> do
65 atomically $ writeTBMQueue toChan1 m
66 atomically $ writeTBMQueue toChan2 m
67 _ -> return ()
68 threadDelay micros -- 5000 -- yield two 100ths of a second, for other threads
69
70teePipeQueue fromChan toChan1 toChan2 =
71 teePipeQueueMicroseconds fromChan toChan1 toChan2 5000
72
73
74-- Deprecated: Use consumeQueueMicroseconds
75-- TODO: Remove this
76withQueueMicroseconds fromChan action delay = whileM_ (atomically . fmap not $ isClosedTBMQueue fromChan) $ do
77 whileM_ (atomically . fmap not $ isEmptyTBMQueue fromChan) $ do
78 t <- atomically $ readTBMQueue fromChan
79 case t of
80 Just x -> action x
81 Nothing -> return ()
82 threadDelay delay
83
84{-# ANN withQueue ("HLint: Ignore Eta reduce"::String) #-}
85withQueue fromchan action = consumeQueueMicroseconds fromchan 5000 action
86{-# DEPRECATED withQueueMicroseconds, withQueue "Use consumeQueueMicroseconds" #-}
87
88-- | consumeQueueMicroseconds
89-- (as of version 1.0.4)
90--
91-- Continously run the provided action on items
92-- from the provided queue. Delay for provided
93-- microseconds each time the queue is emptied.
94consumeQueueMicroseconds q micros action = whileM_ (atomically . fmap not $ isClosedTBMQueue q) $ do
95 whileM_ (atomically . fmap not $ isEmptyTBMQueue q) $ do
96 x <- atomically $ readTBMQueue q
97 case x of
98 Just s -> action s
99 Nothing -> return ()
100 threadDelay micros
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs
new file mode 100644
index 0000000..31101a7
--- /dev/null
+++ b/KikiD/PortServer.hs
@@ -0,0 +1,145 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE BangPatterns #-}
4module KikiD.PortServer (createTCPPortListener) where
5
6import qualified Data.ByteString.Char8 as B
7import Network.Socket hiding (send)
8import Network.Socket.ByteString
9import Data.Monoid ((<>))
10--import qualified Network.IRC as IRC
11import Network.HTTP.Base (catchIO,catchIO_)
12import Control.Concurrent.STM
13import Control.Concurrent
14import Control.Monad
15import Control.Monad.Fix
16import System.IO
17import System.Directory (getAppUserDataDirectory)
18import Text.Printf (printf)
19import System.FilePath ((</>))
20import Control.Concurrent.STM.TBMQueue
21import Control.Monad.Loops
22import KikiD.Multiplex (pipeTransHookMicroseconds)
23import Control.Exception
24import Control.Concurrent.Async
25import Data.Serialize
26
27import Control.Arrow (second)
28--import qualified Merv.GetLine as MG
29
30{-instance Serialize IRC.Message where
31 put = putByteString . IRC.encode
32 get = do
33 x <- MG.getLine
34 case IRC.decode x of
35 Just x -> return x
36 Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'")
37
38
39createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int
40 -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO ()
41createIRCPortListener port name delay qsize maxconns postNewTChans outq =
42 createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact
43
44-}
45
46createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int
47 -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
48 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
49createTCPPortListener port name delay qsize maxconns postNewTChans outq react =
50 bracket
51 -- aquire resources
52 (socket AF_INET Stream 0)
53
54 -- release resources
55 sClose
56
57 -- operate on resources
58 (\sock -> do
59 -- make socket immediately reusable - eases debugging.
60 setSocketOption sock ReuseAddr 1
61 -- listen on TCP port 4242
62 bindSocket sock (SockAddrInet port iNADDR_ANY)
63 -- allow a maximum of 15 outstanding connections
64 listen sock maxconns
65 sockAcceptLoop sock name delay qsize postNewTChans outq react
66 )
67
68sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
69 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
70sockAcceptLoop listenSock name delay qsize postNewTChans outq react =
71 whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
72 -- accept one connection and handle it
73 conn@(sock,_) <- accept listenSock
74 async $ bracket (do
75 -- acquire resources
76 hdl <- socketToHandle sock ReadWriteMode
77 q <- atomically $ newTBMQueue qsize
78 thisChildOut <- atomically $ newTBMQueue qsize
79 async1 <- async (runConn hdl name q thisChildOut delay react)
80 async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
81 (\() -> Just) -- no translation on outgoing
82 (\m -> return ()))
83 return (hdl,q,thisChildOut,(async1,async2))
84 )
85 -- release resources
86 (\(hdl,q,thisChildOut,(async1,async2)) -> do
87 cancel async1
88 cancel async2
89 atomically $ closeTBMQueue q
90 atomically $ closeTBMQueue thisChildOut
91 hClose hdl
92 )
93 -- run opration on async
94 (\(_,q,_,(async1,async2)) -> do
95 let tid = asyncThreadId async1
96 atomically $ writeTBMQueue postNewTChans (tid,q)
97 --link2 async1 async2 -- Do I need this?
98 waitBoth async1 async2
99 )
100
101runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int
102 -> (Handle -> TBMQueue a -> IO ()) -> IO ()
103runConn hdl name q outq delay react = do
104 --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
105 -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
106 -- OnConnect Message...
107
108 race_
109 -- continuously read q and output to handle (socket)
110 -- to terminate thread, close q
111 (do
112 let pending = fmap not (atomically $ isEmptyTBMQueue q)
113 closed = atomically . isClosedTBMQueue $ q
114 whileM_ (fmap not closed) $ do
115 whileM_ pending $ do
116 m <- atomically (readTBMQueue q)
117 case m of
118 Just m -> B.hPutStrLn hdl (encode m)
119 -- Nothing means the Queue is closed and empty, so dont loop
120 Nothing -> return ()
121 threadDelay delay
122 --B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
123 )
124
125 -- continuously input from handle and
126 -- send to provided outq
127 (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )
128
129
130{-
131ircReact hdl outq = do
132 line <- B.hGetLine hdl
133 -- debugging
134 dir <- getAppUserDataDirectory "merv"
135 tid <- myThreadId
136 let bQuit = (B.isPrefixOf "/quit") line
137 appendFile (dir </> "xdebug")
138 (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line))
139 -- end debugging
140 case IRC.decode line of
141 Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq
142 Just m -> atomically $ writeTBMQueue outq m
143 Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq
144 _ -> return undefined
145-}