summaryrefslogtreecommitdiff
path: root/SendMessage.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2013-07-14 03:36:43 -0400
committerjoe <joe@jerkface.net>2013-07-14 03:36:43 -0400
commita400eb11dcd5d8e3a94f2d3505a6b66c63cf54f5 (patch)
treebbf56c74adaf35d014c73f718424a3a7d864a5ed /SendMessage.hs
parent36339ba1369717858639299a323bd0dac7f95c51 (diff)
Factored the cache and retry pattern into module SendMessage
Diffstat (limited to 'SendMessage.hs')
-rw-r--r--SendMessage.hs208
1 files changed, 0 insertions, 208 deletions
diff --git a/SendMessage.hs b/SendMessage.hs
deleted file mode 100644
index 6e8ea2b9..00000000
--- a/SendMessage.hs
+++ /dev/null
@@ -1,208 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-}
3module SendMessage
4 ( sendMessage
5 , CommandCache(..)
6 , newOutgoingConnections
7 , OutgoingConnections
8 ) where
9
10import Control.Monad
11import Control.Monad.Fix
12import Control.Monad.IO.Class
13import Control.Monad.Trans.Maybe
14import Control.Concurrent.STM
15import Control.Concurrent (forkIO)
16import Control.Concurrent.Async (async,waitSTM)
17import Control.Exception as E
18 ( bracketOnError
19 , finally
20 )
21import GHC.Conc
22 ( threadStatus
23 , ThreadStatus(..)
24 , ThreadId
25 )
26import Data.IORef
27import qualified Data.Map as Map
28import Data.Map as Map (Map)
29import Data.XML.Types as XML (Event)
30import System.IO
31 ( BufferMode(..)
32 , IOMode(..)
33 , hSetBuffering
34 )
35import Network.BSD
36 ( getProtocolNumber
37 )
38import Network.Socket
39 ( connect
40 , socketToHandle
41 , sClose
42 , Socket(..)
43 , socket
44 , SocketType(..)
45 )
46
47import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
48import SocketLike
49import ServerC (packetSink)
50import ControlMaybe (handleIO,handleIO_)
51import Data.Conduit (Sink,Source)
52import qualified Data.ByteString as S (ByteString)
53import XMLToByteStrings
54
55type ByteStringSink = Sink S.ByteString IO ()
56
57-- |Strict version of 'modifyIORef'
58modifyIORef' :: IORef a -> (a -> a) -> IO ()
59modifyIORef' ref f = do
60 x <- readIORef ref
61 let x' = f x
62 x' `seq` writeIORef ref x'
63
64
65class CommandCache cache where
66 type CacheableCommand cache
67
68 emptyCache :: cache
69
70 updateCache :: CacheableCommand cache -> cache -> cache
71
72
73data OutgoingConnections cache =
74 OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId)))
75 (OutBoundXML RestrictedSocket cache (CacheableCommand cache))
76
77
78newOutgoingConnections ::
79 ( RestrictedSocket
80 -> cache
81 -> TChan (CacheableCommand cache)
82 -> (Maybe (CacheableCommand cache) -> IO ())
83 -> Source IO [XML.Event] )
84
85 -> STM (OutgoingConnections cache)
86
87newOutgoingConnections interpretCommands = do
88 remotes <- newTVar (Map.empty)
89 return (OutgoingConnections remotes interpretCommands)
90
91
92
93sendMessage
94 :: CommandCache a =>
95 OutgoingConnections a -> CacheableCommand a -> Peer -> IO ()
96sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do
97 let peer = discardPort peer0
98 found <- atomically $ do
99 consmap <- readTVar cons
100 return (Map.lookup peer consmap)
101 let newEntry = do
102 chan <- atomically newTChan
103 t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer)
104 -- L.putStrLn $ "remote-map new: " <++> showPeer peer
105 return (True,(chan,t))
106 (is_new,entry) <- maybe newEntry
107 ( \(chan,t) -> do
108 st <- threadStatus t
109 let running = do
110 -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
111 return (False,(chan,t))
112 died = do
113 -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
114 newEntry
115 case st of
116 ThreadRunning -> running
117 ThreadBlocked _ -> running
118 ThreadDied -> died
119 ThreadFinished -> died
120 )
121 found
122 -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
123 atomically $ writeTChan (fst entry) msg
124 when is_new . atomically $
125 readTVar cons >>= writeTVar cons . Map.insert peer entry
126
127
128connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do
129 let port = 5269 :: Int
130 -- We'll cache Presence notifications until the socket
131 -- is ready.
132 cached <- liftIO $ newIORef emptyCache
133
134 let cacheCmd msg cached = modifyIORef' cached (updateCache msg)
135
136 fix $ \sendmsgs -> do
137 connected <- liftIO . async $ connect' (peerAddr peer) port
138
139 sock <- MaybeT . fix $ \loop -> do
140 e <- atomically $ orElse
141 (fmap Right $ waitSTM connected)
142 (fmap Left $ readTChan chan)
143 case e of
144 Left cmd -> cacheCmd cmd cached >> loop
145 Right sock -> return sock
146
147 retry <- do
148 (cache,snk) <- liftIO $ do
149 h <- socketToHandle sock ReadWriteMode
150 hSetBuffering h NoBuffering
151 cache <- readIORef $ cached
152 -- hint garbage collector: we're done with this...
153 writeIORef cached emptyCache
154 return (cache,packetSink h)
155 MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk
156
157 liftIO $ cacheCmd retry cached
158 -- liftIO $ putStrLn $ "retrying " ++ show retry
159 sendmsgs
160
161connect' :: SockAddr -> Int -> IO (Maybe Socket)
162connect' addr port = do
163 proto <- getProtocolNumber "tcp"
164 let getport (SockAddrInet port _) = port
165 getport (SockAddrInet6 port _ _ _) = port
166 let doException e = do
167 -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
168 return Nothing
169 handleIO doException
170 $ tryToConnect proto (addr `withPort` port)
171 where
172 tryToConnect proto addr =
173 bracketOnError
174 (socket (socketFamily addr) Stream proto)
175 (sClose ) -- only done if there's an error
176 (\sock -> do
177 connect sock addr
178 return (Just sock) -- socketToHandle sock ReadWriteMode
179 )
180
181
182type OutBoundXML sock cache msg =
183 sock
184 -> cache
185 -> TChan msg
186 -> (Maybe msg -> IO ())
187 -> Source IO [XML.Event]
188
189handleOutgoingToPeer
190 :: SocketLike sock =>
191 OutBoundXML sock cache msg
192 -> sock
193 -> cache
194 -> TChan msg
195 -> ByteStringSink
196 -> IO (Maybe msg)
197handleOutgoingToPeer toPeer sock cache chan snk = do
198 p <- getPeerName sock
199 -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
200 failed <- newIORef Nothing
201 let failure cmd = do
202 writeIORef failed cmd
203 -- putStrLn $ "Failed: " ++ show cmd
204 finally (
205 handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
206 ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
207 readIORef failed
208