diff options
author | joe <joe@jerkface.net> | 2013-07-14 02:12:52 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-07-14 02:12:52 -0400 |
commit | fa1b497fe08a85b79061ec663e78a869fcad3003 (patch) | |
tree | 018c013a0e6620c0ac7a0ce4aba05d583989f27b | |
parent | 9c52669ffde25a15066df74666fd2d8eb0bf0438 (diff) |
experimental module SendMessage
-rw-r--r-- | SendMessage.hs | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/SendMessage.hs b/SendMessage.hs new file mode 100644 index 00000000..c6eec01e --- /dev/null +++ b/SendMessage.hs | |||
@@ -0,0 +1,208 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TypeFamilies #-} | ||
3 | module SendMessage where | ||
4 | |||
5 | import Control.Monad | ||
6 | import Control.Monad.Fix | ||
7 | import Control.Monad.IO.Class | ||
8 | import Control.Monad.Trans.Class | ||
9 | import Control.Monad.Trans.Maybe | ||
10 | import Control.Concurrent.STM | ||
11 | import Control.Concurrent (forkIO) | ||
12 | import Control.Concurrent.Async (async,waitSTM) | ||
13 | import Control.Exception as E | ||
14 | ( bracketOnError | ||
15 | , finally | ||
16 | ) | ||
17 | import GHC.Conc | ||
18 | ( threadStatus | ||
19 | , ThreadStatus(..) | ||
20 | , ThreadId | ||
21 | ) | ||
22 | import Data.IORef | ||
23 | import qualified Data.Map as Map | ||
24 | import Data.Map as Map (Map) | ||
25 | import qualified Data.Set as Set | ||
26 | import Data.Set as Set (Set,(\\)) | ||
27 | import Data.XML.Types as XML (Event) | ||
28 | import System.IO | ||
29 | ( BufferMode(..) | ||
30 | , IOMode(..) | ||
31 | , hSetBuffering | ||
32 | ) | ||
33 | import Network.BSD | ||
34 | ( getProtocolNumber | ||
35 | ) | ||
36 | import Network.Socket | ||
37 | ( Family | ||
38 | , connect | ||
39 | , socketToHandle | ||
40 | , sClose | ||
41 | , Socket(..) | ||
42 | , socket | ||
43 | , SocketType(..) | ||
44 | ) | ||
45 | |||
46 | import Todo | ||
47 | import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) | ||
48 | import SocketLike | ||
49 | import ServerC (packetSink) | ||
50 | import ControlMaybe (handleIO,handleIO_) | ||
51 | import Data.Conduit (Sink,Source) | ||
52 | import qualified Data.ByteString as S (ByteString) | ||
53 | import XMLToByteStrings | ||
54 | |||
55 | type ByteStringSink = Data.Conduit.Sink S.ByteString IO () | ||
56 | |||
57 | type OutgoingToPeer sock cache msg = | ||
58 | sock | ||
59 | -> cache | ||
60 | -> TChan msg | ||
61 | -> ByteStringSink | ||
62 | -> IO (Maybe msg) | ||
63 | |||
64 | class CacheableCommand cmd where | ||
65 | type CacheRef cmd :: * | ||
66 | type Cache cmd :: * | ||
67 | |||
68 | emptyCache :: CacheRef cmd | ||
69 | |||
70 | cache :: cmd -> CacheRef cmd -> IO () | ||
71 | |||
72 | interpretCommands :: SocketLike sock => | ||
73 | sock -> Cache cmd -> TChan cmd -> ByteStringSink -> IO (Maybe cmd) | ||
74 | |||
75 | |||
76 | sendMessage :: | ||
77 | (cache, msg -> IORef cache -> IO (), OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO () | ||
78 | sendMessage (newCache,cacheCmd,toPeer) cons msg peer0 = do | ||
79 | let peer = discardPort peer0 | ||
80 | found <- atomically $ do | ||
81 | consmap <- readTVar cons | ||
82 | return (Map.lookup peer consmap) | ||
83 | let newEntry = do | ||
84 | chan <- atomically newTChan | ||
85 | t <- forkIO $ connect_to_server chan peer (newCache,cacheCmd,toPeer) | ||
86 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | ||
87 | return (True,(chan,t)) | ||
88 | (is_new,entry) <- maybe newEntry | ||
89 | ( \(chan,t) -> do | ||
90 | st <- threadStatus t | ||
91 | let running = do | ||
92 | -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer | ||
93 | return (False,(chan,t)) | ||
94 | died = do | ||
95 | -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer | ||
96 | newEntry | ||
97 | case st of | ||
98 | ThreadRunning -> running | ||
99 | ThreadBlocked _ -> running | ||
100 | ThreadDied -> died | ||
101 | ThreadFinished -> died | ||
102 | ) | ||
103 | found | ||
104 | -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg | ||
105 | atomically $ writeTChan (fst entry) msg | ||
106 | when is_new . atomically $ | ||
107 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
108 | |||
109 | |||
110 | connect_to_server chan peer (newCache,cacheCmd,toPeer) = (>> return ()) . runMaybeT $ do | ||
111 | let port = 5269 :: Int | ||
112 | -- We'll cache Presence notifications until the socket | ||
113 | -- is ready. | ||
114 | cached <- liftIO $ newIORef newCache | ||
115 | |||
116 | fix $ \sendmsgs -> do | ||
117 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
118 | |||
119 | sock <- MaybeT . fix $ \loop -> do | ||
120 | e <- atomically $ orElse | ||
121 | (fmap Right $ waitSTM connected) | ||
122 | (fmap Left $ readTChan chan) | ||
123 | case e of | ||
124 | Left cmd -> cacheCmd cmd cached >> loop | ||
125 | Right sock -> return sock | ||
126 | |||
127 | retry <- do | ||
128 | (cache,snk) <- liftIO $ do | ||
129 | h <- socketToHandle sock ReadWriteMode | ||
130 | hSetBuffering h NoBuffering | ||
131 | cache <- readIORef $ cached | ||
132 | -- hint garbage collector: we're done with this... | ||
133 | writeIORef cached newCache | ||
134 | return (cache,packetSink h) | ||
135 | MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk | ||
136 | |||
137 | liftIO $ cacheCmd retry cached | ||
138 | -- liftIO $ putStrLn $ "retrying " ++ show retry | ||
139 | sendmsgs | ||
140 | |||
141 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | ||
142 | connect' addr port = do | ||
143 | proto <- getProtocolNumber "tcp" | ||
144 | {- | ||
145 | -- Given (host :: HostName) ... | ||
146 | let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] | ||
147 | , addrProtocol = proto | ||
148 | , addrSocketType = Stream } | ||
149 | addrs <- getAddrInfo (Just hints) (Just host) (Just serv) | ||
150 | firstSuccessful $ map tryToConnect addrs | ||
151 | -} | ||
152 | let getport (SockAddrInet port _) = port | ||
153 | getport (SockAddrInet6 port _ _ _) = port | ||
154 | let doException e = do | ||
155 | -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | ||
156 | return Nothing | ||
157 | handleIO doException | ||
158 | $ tryToConnect proto (addr `withPort` port) | ||
159 | where | ||
160 | tryToConnect proto addr = | ||
161 | bracketOnError | ||
162 | (socket (socketFamily addr) Stream proto) | ||
163 | (sClose ) -- only done if there's an error | ||
164 | (\sock -> do | ||
165 | connect sock addr | ||
166 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
167 | ) | ||
168 | |||
169 | |||
170 | {- | ||
171 | mmInsert val key mm = Map.alter f key mm | ||
172 | where | ||
173 | f Nothing = Just $ Set.singleton val | ||
174 | f (Just set) = Just $ Set.insert val set | ||
175 | -} | ||
176 | |||
177 | -- newCache = todo | ||
178 | cacheCmd :: msg -> cache -> IO () | ||
179 | cacheCmd _ cached = todo | ||
180 | -- toPeer = todo | ||
181 | |||
182 | type OutBoundXML sock cache msg = | ||
183 | sock | ||
184 | -> cache | ||
185 | -> TChan msg | ||
186 | -> (Maybe msg -> IO ()) | ||
187 | -> Source IO [XML.Event] | ||
188 | |||
189 | handleOutgoingToPeer | ||
190 | :: SocketLike sock => | ||
191 | OutBoundXML sock cache msg | ||
192 | -> sock | ||
193 | -> cache | ||
194 | -> TChan msg | ||
195 | -> ByteStringSink | ||
196 | -> IO (Maybe msg) | ||
197 | handleOutgoingToPeer toPeer sock cache chan snk = do | ||
198 | p <- getPeerName sock | ||
199 | -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) | ||
200 | failed <- newIORef Nothing | ||
201 | let failure cmd = do | ||
202 | writeIORef failed cmd | ||
203 | -- putStrLn $ "Failed: " ++ show cmd | ||
204 | finally ( | ||
205 | handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk | ||
206 | ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) | ||
207 | readIORef failed | ||
208 | |||