1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
module KikiD.PortServer (createTCPPortListener) where
import qualified Data.ByteString.Char8 as B
import Network.Socket hiding (send)
import Network.Socket.ByteString
import Data.Monoid ((<>))
--import qualified Network.IRC as IRC
import Network.HTTP.Base (catchIO,catchIO_)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Control.Monad.Fix
import System.IO
import System.Directory (getAppUserDataDirectory)
import Text.Printf (printf)
import System.FilePath ((</>))
import Control.Concurrent.STM.TBMQueue
import Control.Monad.Loops
import KikiD.Multiplex (pipeTransHookMicroseconds)
import Control.Exception
import Control.Concurrent.Async
import Data.Bytes.Serial as R
import Data.Bytes.Put as Put
import Control.Arrow (second)
createTCPPortListener :: Serial a => PortNumber -> B.ByteString -> Int -> Int -> Int
-> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
createTCPPortListener port name delay qsize maxconns postNewTChans outq react =
bracket
-- aquire resources
(socket AF_INET Stream 0)
-- release resources
sClose
-- operate on resources
(\sock -> do
-- make socket immediately reusable - eases debugging.
setSocketOption sock ReuseAddr 1
-- listen on TCP port 4242
bindSocket sock (SockAddrInet port iNADDR_ANY)
-- allow a maximum of 15 outstanding connections
listen sock maxconns
sockAcceptLoop sock name delay qsize postNewTChans outq react
)
sockAcceptLoop :: Serial a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
sockAcceptLoop listenSock name delay qsize postNewTChans outq react =
whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
-- accept one connection and handle it
conn@(sock,_) <- accept listenSock
async $ bracket (do
-- acquire resources
hdl <- socketToHandle sock ReadWriteMode
q <- atomically $ newTBMQueue qsize
thisChildOut <- atomically $ newTBMQueue qsize
async1 <- async (runConn hdl name q thisChildOut delay react)
async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
(\() -> Just) -- no translation on outgoing
(\m -> return ()))
return (hdl,q,thisChildOut,(async1,async2))
)
-- release resources
(\(hdl,q,thisChildOut,(async1,async2)) -> do
cancel async1
cancel async2
atomically $ closeTBMQueue q
atomically $ closeTBMQueue thisChildOut
hClose hdl
)
-- run opration on async
(\(_,q,_,(async1,async2)) -> do
let tid = asyncThreadId async1
atomically $ writeTBMQueue postNewTChans (tid,q)
--link2 async1 async2 -- Do I need this?
waitBoth async1 async2
)
runConn :: Serial a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
runConn hdl name q outq delay react = do
--send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
-- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
-- OnConnect Message...
race_
-- continuously read q and output to handle (socket)
-- to terminate thread, close q
(do
let pending = fmap not (atomically $ isEmptyTBMQueue q)
closed = atomically . isClosedTBMQueue $ q
whileM_ (fmap not closed) $ do
whileM_ pending $ do
m <- atomically (readTBMQueue q)
case m of
Just m -> B.hPutStrLn hdl (runPutS $ R.serialize m)
-- Nothing means the Queue is closed and empty, so dont loop
Nothing -> return ()
threadDelay delay
--B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
)
-- continuously input from handle and
-- send to provided outq
(whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )
|