summaryrefslogtreecommitdiff
path: root/KikiD/PortServer.hs
blob: b42e340d979e532ded191dab18d4df44379fb48e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
module KikiD.PortServer (createTCPPortListener) where

import qualified Data.ByteString.Char8 as B
import Network.Socket hiding (send)
import Network.Socket.ByteString
import Data.Monoid ((<>))
--import qualified Network.IRC as IRC
import Network.HTTP.Base (catchIO,catchIO_)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Control.Monad.Fix
import System.IO
import System.Directory (getAppUserDataDirectory)
import Text.Printf (printf)
import System.FilePath ((</>))
import Control.Concurrent.STM.TBMQueue
import Control.Monad.Loops
import KikiD.Multiplex (pipeTransHookMicroseconds)
import Control.Exception
import Control.Concurrent.Async
import Data.Bytes.Serial as R
import Data.Bytes.Put as Put

import Control.Arrow (second)


createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int
                   -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a 
                   -> (Handle -> TBMQueue a -> IO ()) -> IO ()
createTCPPortListener port name delay qsize maxconns postNewTChans outq react = 
    bracket
        -- aquire resources
        (socket AF_INET Stream 0)

        -- release resources
        sClose 

        -- operate on resources
        (\sock -> do
        -- make socket immediately reusable - eases debugging.
        setSocketOption sock ReuseAddr 1
        -- listen on TCP port 4242
        bindSocket sock (SockAddrInet port iNADDR_ANY)
        -- allow a maximum of 15 outstanding connections
        listen sock maxconns
        sockAcceptLoop sock name delay qsize postNewTChans outq react
        )
 
sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a 
                       -> (Handle -> TBMQueue a -> IO ()) -> IO ()
sockAcceptLoop listenSock name delay qsize postNewTChans outq react = 
    whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
        -- accept one connection and handle it
        conn@(sock,_) <- accept listenSock
        async $ bracket (do
            -- acquire resources 
                hdl <- socketToHandle sock ReadWriteMode
                q <- atomically $ newTBMQueue qsize 
                thisChildOut <- atomically $ newTBMQueue qsize 
                async1 <- async (runConn hdl name q thisChildOut delay react) 
                async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
                                        (\() -> Just)  -- no translation on outgoing
                                        (\m -> return ()))
                return (hdl,q,thisChildOut,(async1,async2))
            )
            -- release resources
            (\(hdl,q,thisChildOut,(async1,async2)) -> do
                cancel async1
                cancel async2
                atomically $ closeTBMQueue q
                atomically $ closeTBMQueue thisChildOut
                hClose hdl
            )
            -- run opration on async
            (\(_,q,_,(async1,async2)) -> do
                let tid = asyncThreadId async1
                atomically $ writeTBMQueue postNewTChans (tid,q)
                --link2 async1 async2  -- Do I need this?
                waitBoth async1 async2
            )
 
runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int 
                       -> (Handle -> TBMQueue a -> IO ()) -> IO ()
runConn hdl name q outq delay react = do
    --send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
    -- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
    -- OnConnect Message...

    race_ 
        -- continuously read q and output to handle (socket)
        -- to terminate thread, close q
        (do
        let pending = fmap not (atomically $ isEmptyTBMQueue q)
            closed = atomically . isClosedTBMQueue $ q
        whileM_ (fmap not closed) $ do
            whileM_ pending  $ do
                m <- atomically (readTBMQueue q)
                case m of
                    Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m) 
                    -- Nothing means the Queue is closed and empty, so dont loop
                    Nothing -> return () 
            threadDelay delay
        --B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
        )

        -- continuously input from handle and
        -- send to provided outq
        (whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )