diff options
Diffstat (limited to 'KikiD/PortServer.hs')
-rw-r--r-- | KikiD/PortServer.hs | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/KikiD/PortServer.hs b/KikiD/PortServer.hs new file mode 100644 index 0000000..31101a7 --- /dev/null +++ b/KikiD/PortServer.hs | |||
@@ -0,0 +1,145 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE ViewPatterns #-} | ||
3 | {-# LANGUAGE BangPatterns #-} | ||
4 | module KikiD.PortServer (createTCPPortListener) where | ||
5 | |||
6 | import qualified Data.ByteString.Char8 as B | ||
7 | import Network.Socket hiding (send) | ||
8 | import Network.Socket.ByteString | ||
9 | import Data.Monoid ((<>)) | ||
10 | --import qualified Network.IRC as IRC | ||
11 | import Network.HTTP.Base (catchIO,catchIO_) | ||
12 | import Control.Concurrent.STM | ||
13 | import Control.Concurrent | ||
14 | import Control.Monad | ||
15 | import Control.Monad.Fix | ||
16 | import System.IO | ||
17 | import System.Directory (getAppUserDataDirectory) | ||
18 | import Text.Printf (printf) | ||
19 | import System.FilePath ((</>)) | ||
20 | import Control.Concurrent.STM.TBMQueue | ||
21 | import Control.Monad.Loops | ||
22 | import KikiD.Multiplex (pipeTransHookMicroseconds) | ||
23 | import Control.Exception | ||
24 | import Control.Concurrent.Async | ||
25 | import Data.Serialize | ||
26 | |||
27 | import Control.Arrow (second) | ||
28 | --import qualified Merv.GetLine as MG | ||
29 | |||
30 | {-instance Serialize IRC.Message where | ||
31 | put = putByteString . IRC.encode | ||
32 | get = do | ||
33 | x <- MG.getLine | ||
34 | case IRC.decode x of | ||
35 | Just x -> return x | ||
36 | Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'") | ||
37 | |||
38 | |||
39 | createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int | ||
40 | -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO () | ||
41 | createIRCPortListener port name delay qsize maxconns postNewTChans outq = | ||
42 | createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact | ||
43 | |||
44 | -} | ||
45 | |||
46 | createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int | ||
47 | -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
48 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
49 | createTCPPortListener port name delay qsize maxconns postNewTChans outq react = | ||
50 | bracket | ||
51 | -- aquire resources | ||
52 | (socket AF_INET Stream 0) | ||
53 | |||
54 | -- release resources | ||
55 | sClose | ||
56 | |||
57 | -- operate on resources | ||
58 | (\sock -> do | ||
59 | -- make socket immediately reusable - eases debugging. | ||
60 | setSocketOption sock ReuseAddr 1 | ||
61 | -- listen on TCP port 4242 | ||
62 | bindSocket sock (SockAddrInet port iNADDR_ANY) | ||
63 | -- allow a maximum of 15 outstanding connections | ||
64 | listen sock maxconns | ||
65 | sockAcceptLoop sock name delay qsize postNewTChans outq react | ||
66 | ) | ||
67 | |||
68 | sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a | ||
69 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
70 | sockAcceptLoop listenSock name delay qsize postNewTChans outq react = | ||
71 | whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do | ||
72 | -- accept one connection and handle it | ||
73 | conn@(sock,_) <- accept listenSock | ||
74 | async $ bracket (do | ||
75 | -- acquire resources | ||
76 | hdl <- socketToHandle sock ReadWriteMode | ||
77 | q <- atomically $ newTBMQueue qsize | ||
78 | thisChildOut <- atomically $ newTBMQueue qsize | ||
79 | async1 <- async (runConn hdl name q thisChildOut delay react) | ||
80 | async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000 | ||
81 | (\() -> Just) -- no translation on outgoing | ||
82 | (\m -> return ())) | ||
83 | return (hdl,q,thisChildOut,(async1,async2)) | ||
84 | ) | ||
85 | -- release resources | ||
86 | (\(hdl,q,thisChildOut,(async1,async2)) -> do | ||
87 | cancel async1 | ||
88 | cancel async2 | ||
89 | atomically $ closeTBMQueue q | ||
90 | atomically $ closeTBMQueue thisChildOut | ||
91 | hClose hdl | ||
92 | ) | ||
93 | -- run opration on async | ||
94 | (\(_,q,_,(async1,async2)) -> do | ||
95 | let tid = asyncThreadId async1 | ||
96 | atomically $ writeTBMQueue postNewTChans (tid,q) | ||
97 | --link2 async1 async2 -- Do I need this? | ||
98 | waitBoth async1 async2 | ||
99 | ) | ||
100 | |||
101 | runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int | ||
102 | -> (Handle -> TBMQueue a -> IO ()) -> IO () | ||
103 | runConn hdl name q outq delay react = do | ||
104 | --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
105 | -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")])) | ||
106 | -- OnConnect Message... | ||
107 | |||
108 | race_ | ||
109 | -- continuously read q and output to handle (socket) | ||
110 | -- to terminate thread, close q | ||
111 | (do | ||
112 | let pending = fmap not (atomically $ isEmptyTBMQueue q) | ||
113 | closed = atomically . isClosedTBMQueue $ q | ||
114 | whileM_ (fmap not closed) $ do | ||
115 | whileM_ pending $ do | ||
116 | m <- atomically (readTBMQueue q) | ||
117 | case m of | ||
118 | Just m -> B.hPutStrLn hdl (encode m) | ||
119 | -- Nothing means the Queue is closed and empty, so dont loop | ||
120 | Nothing -> return () | ||
121 | threadDelay delay | ||
122 | --B.hPutStrLn hdl (encode (quit (Just "Bye!")) ) | ||
123 | ) | ||
124 | |||
125 | -- continuously input from handle and | ||
126 | -- send to provided outq | ||
127 | (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq ) | ||
128 | |||
129 | |||
130 | {- | ||
131 | ircReact hdl outq = do | ||
132 | line <- B.hGetLine hdl | ||
133 | -- debugging | ||
134 | dir <- getAppUserDataDirectory "merv" | ||
135 | tid <- myThreadId | ||
136 | let bQuit = (B.isPrefixOf "/quit") line | ||
137 | appendFile (dir </> "xdebug") | ||
138 | (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line)) | ||
139 | -- end debugging | ||
140 | case IRC.decode line of | ||
141 | Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq | ||
142 | Just m -> atomically $ writeTBMQueue outq m | ||
143 | Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq | ||
144 | _ -> return undefined | ||
145 | -} | ||