summaryrefslogtreecommitdiff
path: root/KikiD/PortServer.hs
blob: 31101a77f2deeb85d16ac95134865eb41e6f889d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
module KikiD.PortServer (createTCPPortListener) where

import qualified Data.ByteString.Char8 as B
import Network.Socket hiding (send)
import Network.Socket.ByteString
import Data.Monoid ((<>))
--import qualified Network.IRC as IRC
import Network.HTTP.Base (catchIO,catchIO_)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Control.Monad.Fix
import System.IO
import System.Directory (getAppUserDataDirectory)
import Text.Printf (printf)
import System.FilePath ((</>))
import Control.Concurrent.STM.TBMQueue
import Control.Monad.Loops
import KikiD.Multiplex (pipeTransHookMicroseconds)
import Control.Exception
import Control.Concurrent.Async
import Data.Serialize

import Control.Arrow (second)
--import qualified Merv.GetLine as MG

{-instance Serialize IRC.Message where
    put = putByteString . IRC.encode
    get = do
        x <- MG.getLine
        case IRC.decode x of
            Just x -> return x
            Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'")


createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int
                   -> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO ()
createIRCPortListener port name delay qsize maxconns postNewTChans outq = 
  createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact

-}

createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int
                   -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a 
                   -> (Handle -> TBMQueue a -> IO ()) -> IO ()
createTCPPortListener port name delay qsize maxconns postNewTChans outq react = 
    bracket
        -- aquire resources
        (socket AF_INET Stream 0)

        -- release resources
        sClose 

        -- operate on resources
        (\sock -> do
        -- make socket immediately reusable - eases debugging.
        setSocketOption sock ReuseAddr 1
        -- listen on TCP port 4242
        bindSocket sock (SockAddrInet port iNADDR_ANY)
        -- allow a maximum of 15 outstanding connections
        listen sock maxconns
        sockAcceptLoop sock name delay qsize postNewTChans outq react
        )
 
sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a 
                       -> (Handle -> TBMQueue a -> IO ()) -> IO ()
sockAcceptLoop listenSock name delay qsize postNewTChans outq react = 
    whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
        -- accept one connection and handle it
        conn@(sock,_) <- accept listenSock
        async $ bracket (do
            -- acquire resources 
                hdl <- socketToHandle sock ReadWriteMode
                q <- atomically $ newTBMQueue qsize 
                thisChildOut <- atomically $ newTBMQueue qsize 
                async1 <- async (runConn hdl name q thisChildOut delay react) 
                async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
                                        (\() -> Just)  -- no translation on outgoing
                                        (\m -> return ()))
                return (hdl,q,thisChildOut,(async1,async2))
            )
            -- release resources
            (\(hdl,q,thisChildOut,(async1,async2)) -> do
                cancel async1
                cancel async2
                atomically $ closeTBMQueue q
                atomically $ closeTBMQueue thisChildOut
                hClose hdl
            )
            -- run opration on async
            (\(_,q,_,(async1,async2)) -> do
                let tid = asyncThreadId async1
                atomically $ writeTBMQueue postNewTChans (tid,q)
                --link2 async1 async2  -- Do I need this?
                waitBoth async1 async2
            )
 
runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int 
                       -> (Handle -> TBMQueue a -> IO ()) -> IO ()
runConn hdl name q outq delay react = do
    --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
    -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
    -- OnConnect Message...

    race_ 
        -- continuously read q and output to handle (socket)
        -- to terminate thread, close q
        (do
        let pending = fmap not (atomically $ isEmptyTBMQueue q)
            closed = atomically . isClosedTBMQueue $ q
        whileM_ (fmap not closed) $ do
            whileM_ pending  $ do
                m <- atomically (readTBMQueue q)
                case m of
                    Just m -> B.hPutStrLn hdl (encode m) 
                    -- Nothing means the Queue is closed and empty, so dont loop
                    Nothing -> return () 
            threadDelay delay
        --B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
        )

        -- continuously input from handle and
        -- send to provided outq
        (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )


{-
ircReact hdl outq = do
        line <- B.hGetLine hdl
        -- debugging
        dir <- getAppUserDataDirectory "merv"
        tid <- myThreadId
        let bQuit = (B.isPrefixOf "/quit") line
        appendFile (dir </> "xdebug") 
                   (printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line))
        -- end debugging
        case IRC.decode line of
            Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq
            Just m -> atomically $ writeTBMQueue outq m
            Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq
            _ -> return undefined
-}