diff options
author | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
commit | a400eb11dcd5d8e3a94f2d3505a6b66c63cf54f5 (patch) | |
tree | bbf56c74adaf35d014c73f718424a3a7d864a5ed /Presence/SendMessage.hs | |
parent | 36339ba1369717858639299a323bd0dac7f95c51 (diff) |
Factored the cache and retry pattern into module SendMessage
Diffstat (limited to 'Presence/SendMessage.hs')
-rw-r--r-- | Presence/SendMessage.hs | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/Presence/SendMessage.hs b/Presence/SendMessage.hs new file mode 100644 index 00000000..6e8ea2b9 --- /dev/null +++ b/Presence/SendMessage.hs | |||
@@ -0,0 +1,208 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TypeFamilies #-} | ||
3 | module SendMessage | ||
4 | ( sendMessage | ||
5 | , CommandCache(..) | ||
6 | , newOutgoingConnections | ||
7 | , OutgoingConnections | ||
8 | ) where | ||
9 | |||
10 | import Control.Monad | ||
11 | import Control.Monad.Fix | ||
12 | import Control.Monad.IO.Class | ||
13 | import Control.Monad.Trans.Maybe | ||
14 | import Control.Concurrent.STM | ||
15 | import Control.Concurrent (forkIO) | ||
16 | import Control.Concurrent.Async (async,waitSTM) | ||
17 | import Control.Exception as E | ||
18 | ( bracketOnError | ||
19 | , finally | ||
20 | ) | ||
21 | import GHC.Conc | ||
22 | ( threadStatus | ||
23 | , ThreadStatus(..) | ||
24 | , ThreadId | ||
25 | ) | ||
26 | import Data.IORef | ||
27 | import qualified Data.Map as Map | ||
28 | import Data.Map as Map (Map) | ||
29 | import Data.XML.Types as XML (Event) | ||
30 | import System.IO | ||
31 | ( BufferMode(..) | ||
32 | , IOMode(..) | ||
33 | , hSetBuffering | ||
34 | ) | ||
35 | import Network.BSD | ||
36 | ( getProtocolNumber | ||
37 | ) | ||
38 | import Network.Socket | ||
39 | ( connect | ||
40 | , socketToHandle | ||
41 | , sClose | ||
42 | , Socket(..) | ||
43 | , socket | ||
44 | , SocketType(..) | ||
45 | ) | ||
46 | |||
47 | import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) | ||
48 | import SocketLike | ||
49 | import ServerC (packetSink) | ||
50 | import ControlMaybe (handleIO,handleIO_) | ||
51 | import Data.Conduit (Sink,Source) | ||
52 | import qualified Data.ByteString as S (ByteString) | ||
53 | import XMLToByteStrings | ||
54 | |||
55 | type ByteStringSink = Sink S.ByteString IO () | ||
56 | |||
57 | -- |Strict version of 'modifyIORef' | ||
58 | modifyIORef' :: IORef a -> (a -> a) -> IO () | ||
59 | modifyIORef' ref f = do | ||
60 | x <- readIORef ref | ||
61 | let x' = f x | ||
62 | x' `seq` writeIORef ref x' | ||
63 | |||
64 | |||
65 | class CommandCache cache where | ||
66 | type CacheableCommand cache | ||
67 | |||
68 | emptyCache :: cache | ||
69 | |||
70 | updateCache :: CacheableCommand cache -> cache -> cache | ||
71 | |||
72 | |||
73 | data OutgoingConnections cache = | ||
74 | OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId))) | ||
75 | (OutBoundXML RestrictedSocket cache (CacheableCommand cache)) | ||
76 | |||
77 | |||
78 | newOutgoingConnections :: | ||
79 | ( RestrictedSocket | ||
80 | -> cache | ||
81 | -> TChan (CacheableCommand cache) | ||
82 | -> (Maybe (CacheableCommand cache) -> IO ()) | ||
83 | -> Source IO [XML.Event] ) | ||
84 | |||
85 | -> STM (OutgoingConnections cache) | ||
86 | |||
87 | newOutgoingConnections interpretCommands = do | ||
88 | remotes <- newTVar (Map.empty) | ||
89 | return (OutgoingConnections remotes interpretCommands) | ||
90 | |||
91 | |||
92 | |||
93 | sendMessage | ||
94 | :: CommandCache a => | ||
95 | OutgoingConnections a -> CacheableCommand a -> Peer -> IO () | ||
96 | sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do | ||
97 | let peer = discardPort peer0 | ||
98 | found <- atomically $ do | ||
99 | consmap <- readTVar cons | ||
100 | return (Map.lookup peer consmap) | ||
101 | let newEntry = do | ||
102 | chan <- atomically newTChan | ||
103 | t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer) | ||
104 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | ||
105 | return (True,(chan,t)) | ||
106 | (is_new,entry) <- maybe newEntry | ||
107 | ( \(chan,t) -> do | ||
108 | st <- threadStatus t | ||
109 | let running = do | ||
110 | -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer | ||
111 | return (False,(chan,t)) | ||
112 | died = do | ||
113 | -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer | ||
114 | newEntry | ||
115 | case st of | ||
116 | ThreadRunning -> running | ||
117 | ThreadBlocked _ -> running | ||
118 | ThreadDied -> died | ||
119 | ThreadFinished -> died | ||
120 | ) | ||
121 | found | ||
122 | -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg | ||
123 | atomically $ writeTChan (fst entry) msg | ||
124 | when is_new . atomically $ | ||
125 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
126 | |||
127 | |||
128 | connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do | ||
129 | let port = 5269 :: Int | ||
130 | -- We'll cache Presence notifications until the socket | ||
131 | -- is ready. | ||
132 | cached <- liftIO $ newIORef emptyCache | ||
133 | |||
134 | let cacheCmd msg cached = modifyIORef' cached (updateCache msg) | ||
135 | |||
136 | fix $ \sendmsgs -> do | ||
137 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
138 | |||
139 | sock <- MaybeT . fix $ \loop -> do | ||
140 | e <- atomically $ orElse | ||
141 | (fmap Right $ waitSTM connected) | ||
142 | (fmap Left $ readTChan chan) | ||
143 | case e of | ||
144 | Left cmd -> cacheCmd cmd cached >> loop | ||
145 | Right sock -> return sock | ||
146 | |||
147 | retry <- do | ||
148 | (cache,snk) <- liftIO $ do | ||
149 | h <- socketToHandle sock ReadWriteMode | ||
150 | hSetBuffering h NoBuffering | ||
151 | cache <- readIORef $ cached | ||
152 | -- hint garbage collector: we're done with this... | ||
153 | writeIORef cached emptyCache | ||
154 | return (cache,packetSink h) | ||
155 | MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk | ||
156 | |||
157 | liftIO $ cacheCmd retry cached | ||
158 | -- liftIO $ putStrLn $ "retrying " ++ show retry | ||
159 | sendmsgs | ||
160 | |||
161 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | ||
162 | connect' addr port = do | ||
163 | proto <- getProtocolNumber "tcp" | ||
164 | let getport (SockAddrInet port _) = port | ||
165 | getport (SockAddrInet6 port _ _ _) = port | ||
166 | let doException e = do | ||
167 | -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | ||
168 | return Nothing | ||
169 | handleIO doException | ||
170 | $ tryToConnect proto (addr `withPort` port) | ||
171 | where | ||
172 | tryToConnect proto addr = | ||
173 | bracketOnError | ||
174 | (socket (socketFamily addr) Stream proto) | ||
175 | (sClose ) -- only done if there's an error | ||
176 | (\sock -> do | ||
177 | connect sock addr | ||
178 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
179 | ) | ||
180 | |||
181 | |||
182 | type OutBoundXML sock cache msg = | ||
183 | sock | ||
184 | -> cache | ||
185 | -> TChan msg | ||
186 | -> (Maybe msg -> IO ()) | ||
187 | -> Source IO [XML.Event] | ||
188 | |||
189 | handleOutgoingToPeer | ||
190 | :: SocketLike sock => | ||
191 | OutBoundXML sock cache msg | ||
192 | -> sock | ||
193 | -> cache | ||
194 | -> TChan msg | ||
195 | -> ByteStringSink | ||
196 | -> IO (Maybe msg) | ||
197 | handleOutgoingToPeer toPeer sock cache chan snk = do | ||
198 | p <- getPeerName sock | ||
199 | -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) | ||
200 | failed <- newIORef Nothing | ||
201 | let failure cmd = do | ||
202 | writeIORef failed cmd | ||
203 | -- putStrLn $ "Failed: " ++ show cmd | ||
204 | finally ( | ||
205 | handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk | ||
206 | ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) | ||
207 | readIORef failed | ||
208 | |||