1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
module KikiD.PortServer (createTCPPortListener) where
import qualified Data.ByteString.Char8 as B
import Network.Socket hiding (send)
import Network.Socket.ByteString
import Data.Monoid ((<>))
--import qualified Network.IRC as IRC
import Network.HTTP.Base (catchIO,catchIO_)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Control.Monad.Fix
import System.IO
import System.Directory (getAppUserDataDirectory)
import Text.Printf (printf)
import System.FilePath ((</>))
import Control.Concurrent.STM.TBMQueue
import Control.Monad.Loops
import KikiD.Multiplex (pipeTransHookMicroseconds)
import Control.Exception
import Control.Concurrent.Async
import Data.Serialize
import Control.Arrow (second)
--import qualified Merv.GetLine as MG
{-instance Serialize IRC.Message where
put = putByteString . IRC.encode
get = do
x <- MG.getLine
case IRC.decode x of
Just x -> return x
Nothing -> fail ("IRC PARSE ERROR:'" <> B.unpack x <> "'")
createIRCPortListener :: PortNumber -> B.ByteString -> Int -> Int -> Int
-> TBMQueue (ThreadId,TBMQueue IRC.Message) -> TBMQueue IRC.Message -> IO ()
createIRCPortListener port name delay qsize maxconns postNewTChans outq =
createTCPPortListener port name delay qsize maxconns postNewTChans outq ircReact
-}
createTCPPortListener :: Serialize a => PortNumber -> B.ByteString -> Int -> Int -> Int
-> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
createTCPPortListener port name delay qsize maxconns postNewTChans outq react =
bracket
-- aquire resources
(socket AF_INET Stream 0)
-- release resources
sClose
-- operate on resources
(\sock -> do
-- make socket immediately reusable - eases debugging.
setSocketOption sock ReuseAddr 1
-- listen on TCP port 4242
bindSocket sock (SockAddrInet port iNADDR_ANY)
-- allow a maximum of 15 outstanding connections
listen sock maxconns
sockAcceptLoop sock name delay qsize postNewTChans outq react
)
sockAcceptLoop :: Serialize a => Socket -> B.ByteString -> Int -> Int -> TBMQueue (ThreadId,TBMQueue a) -> TBMQueue a
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
sockAcceptLoop listenSock name delay qsize postNewTChans outq react =
whileM_ (atomically $ fmap not (isClosedTBMQueue postNewTChans)) $ do
-- accept one connection and handle it
conn@(sock,_) <- accept listenSock
async $ bracket (do
-- acquire resources
hdl <- socketToHandle sock ReadWriteMode
q <- atomically $ newTBMQueue qsize
thisChildOut <- atomically $ newTBMQueue qsize
async1 <- async (runConn hdl name q thisChildOut delay react)
async2 <- async (pipeTransHookMicroseconds thisChildOut outq 5000
(\() -> Just) -- no translation on outgoing
(\m -> return ()))
return (hdl,q,thisChildOut,(async1,async2))
)
-- release resources
(\(hdl,q,thisChildOut,(async1,async2)) -> do
cancel async1
cancel async2
atomically $ closeTBMQueue q
atomically $ closeTBMQueue thisChildOut
hClose hdl
)
-- run opration on async
(\(_,q,_,(async1,async2)) -> do
let tid = asyncThreadId async1
atomically $ writeTBMQueue postNewTChans (tid,q)
--link2 async1 async2 -- Do I need this?
waitBoth async1 async2
)
runConn :: Serialize a => Handle -> B.ByteString -> TBMQueue a -> TBMQueue a -> Int
-> (Handle -> TBMQueue a -> IO ()) -> IO ()
runConn hdl name q outq delay react = do
--send sock (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
-- B.hPutStrLn hdl (encode (Message Nothing "NOTICE" ["*", ("Hi " <> name <> "!\n")]))
-- OnConnect Message...
race_
-- continuously read q and output to handle (socket)
-- to terminate thread, close q
(do
let pending = fmap not (atomically $ isEmptyTBMQueue q)
closed = atomically . isClosedTBMQueue $ q
whileM_ (fmap not closed) $ do
whileM_ pending $ do
m <- atomically (readTBMQueue q)
case m of
Just m -> B.hPutStrLn hdl (encode m)
-- Nothing means the Queue is closed and empty, so dont loop
Nothing -> return ()
threadDelay delay
--B.hPutStrLn hdl (encode (quit (Just "Bye!")) )
)
-- continuously input from handle and
-- send to provided outq
(whileM_ (atomically . fmap not $ isClosedTBMQueue outq) $ react hdl outq )
{-
ircReact hdl outq = do
line <- B.hGetLine hdl
-- debugging
dir <- getAppUserDataDirectory "merv"
tid <- myThreadId
let bQuit = (B.isPrefixOf "/quit") line
appendFile (dir </> "xdebug")
(printf "%s:%s\n(bQuit=%s) %s\n" (show tid) (show line) (show bQuit) (show $ IRC.parseMessage line))
-- end debugging
case IRC.decode line of
Just (IRC.msg_command -> "QUIT") -> atomically $ closeTBMQueue outq
Just m -> atomically $ writeTBMQueue outq m
Nothing | "/q" `B.isPrefixOf` line -> atomically $ closeTBMQueue outq
_ -> return undefined
-}
|