diff options
author | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
commit | a400eb11dcd5d8e3a94f2d3505a6b66c63cf54f5 (patch) | |
tree | bbf56c74adaf35d014c73f718424a3a7d864a5ed /Presence | |
parent | 36339ba1369717858639299a323bd0dac7f95c51 (diff) |
Factored the cache and retry pattern into module SendMessage
Diffstat (limited to 'Presence')
-rw-r--r-- | Presence/SendMessage.hs | 208 | ||||
-rw-r--r-- | Presence/XMPP.hs | 104 | ||||
-rw-r--r-- | Presence/main.hs | 4 |
3 files changed, 241 insertions, 75 deletions
diff --git a/Presence/SendMessage.hs b/Presence/SendMessage.hs new file mode 100644 index 00000000..6e8ea2b9 --- /dev/null +++ b/Presence/SendMessage.hs | |||
@@ -0,0 +1,208 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TypeFamilies #-} | ||
3 | module SendMessage | ||
4 | ( sendMessage | ||
5 | , CommandCache(..) | ||
6 | , newOutgoingConnections | ||
7 | , OutgoingConnections | ||
8 | ) where | ||
9 | |||
10 | import Control.Monad | ||
11 | import Control.Monad.Fix | ||
12 | import Control.Monad.IO.Class | ||
13 | import Control.Monad.Trans.Maybe | ||
14 | import Control.Concurrent.STM | ||
15 | import Control.Concurrent (forkIO) | ||
16 | import Control.Concurrent.Async (async,waitSTM) | ||
17 | import Control.Exception as E | ||
18 | ( bracketOnError | ||
19 | , finally | ||
20 | ) | ||
21 | import GHC.Conc | ||
22 | ( threadStatus | ||
23 | , ThreadStatus(..) | ||
24 | , ThreadId | ||
25 | ) | ||
26 | import Data.IORef | ||
27 | import qualified Data.Map as Map | ||
28 | import Data.Map as Map (Map) | ||
29 | import Data.XML.Types as XML (Event) | ||
30 | import System.IO | ||
31 | ( BufferMode(..) | ||
32 | , IOMode(..) | ||
33 | , hSetBuffering | ||
34 | ) | ||
35 | import Network.BSD | ||
36 | ( getProtocolNumber | ||
37 | ) | ||
38 | import Network.Socket | ||
39 | ( connect | ||
40 | , socketToHandle | ||
41 | , sClose | ||
42 | , Socket(..) | ||
43 | , socket | ||
44 | , SocketType(..) | ||
45 | ) | ||
46 | |||
47 | import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) | ||
48 | import SocketLike | ||
49 | import ServerC (packetSink) | ||
50 | import ControlMaybe (handleIO,handleIO_) | ||
51 | import Data.Conduit (Sink,Source) | ||
52 | import qualified Data.ByteString as S (ByteString) | ||
53 | import XMLToByteStrings | ||
54 | |||
55 | type ByteStringSink = Sink S.ByteString IO () | ||
56 | |||
57 | -- |Strict version of 'modifyIORef' | ||
58 | modifyIORef' :: IORef a -> (a -> a) -> IO () | ||
59 | modifyIORef' ref f = do | ||
60 | x <- readIORef ref | ||
61 | let x' = f x | ||
62 | x' `seq` writeIORef ref x' | ||
63 | |||
64 | |||
65 | class CommandCache cache where | ||
66 | type CacheableCommand cache | ||
67 | |||
68 | emptyCache :: cache | ||
69 | |||
70 | updateCache :: CacheableCommand cache -> cache -> cache | ||
71 | |||
72 | |||
73 | data OutgoingConnections cache = | ||
74 | OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId))) | ||
75 | (OutBoundXML RestrictedSocket cache (CacheableCommand cache)) | ||
76 | |||
77 | |||
78 | newOutgoingConnections :: | ||
79 | ( RestrictedSocket | ||
80 | -> cache | ||
81 | -> TChan (CacheableCommand cache) | ||
82 | -> (Maybe (CacheableCommand cache) -> IO ()) | ||
83 | -> Source IO [XML.Event] ) | ||
84 | |||
85 | -> STM (OutgoingConnections cache) | ||
86 | |||
87 | newOutgoingConnections interpretCommands = do | ||
88 | remotes <- newTVar (Map.empty) | ||
89 | return (OutgoingConnections remotes interpretCommands) | ||
90 | |||
91 | |||
92 | |||
93 | sendMessage | ||
94 | :: CommandCache a => | ||
95 | OutgoingConnections a -> CacheableCommand a -> Peer -> IO () | ||
96 | sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do | ||
97 | let peer = discardPort peer0 | ||
98 | found <- atomically $ do | ||
99 | consmap <- readTVar cons | ||
100 | return (Map.lookup peer consmap) | ||
101 | let newEntry = do | ||
102 | chan <- atomically newTChan | ||
103 | t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer) | ||
104 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | ||
105 | return (True,(chan,t)) | ||
106 | (is_new,entry) <- maybe newEntry | ||
107 | ( \(chan,t) -> do | ||
108 | st <- threadStatus t | ||
109 | let running = do | ||
110 | -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer | ||
111 | return (False,(chan,t)) | ||
112 | died = do | ||
113 | -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer | ||
114 | newEntry | ||
115 | case st of | ||
116 | ThreadRunning -> running | ||
117 | ThreadBlocked _ -> running | ||
118 | ThreadDied -> died | ||
119 | ThreadFinished -> died | ||
120 | ) | ||
121 | found | ||
122 | -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg | ||
123 | atomically $ writeTChan (fst entry) msg | ||
124 | when is_new . atomically $ | ||
125 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
126 | |||
127 | |||
128 | connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do | ||
129 | let port = 5269 :: Int | ||
130 | -- We'll cache Presence notifications until the socket | ||
131 | -- is ready. | ||
132 | cached <- liftIO $ newIORef emptyCache | ||
133 | |||
134 | let cacheCmd msg cached = modifyIORef' cached (updateCache msg) | ||
135 | |||
136 | fix $ \sendmsgs -> do | ||
137 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
138 | |||
139 | sock <- MaybeT . fix $ \loop -> do | ||
140 | e <- atomically $ orElse | ||
141 | (fmap Right $ waitSTM connected) | ||
142 | (fmap Left $ readTChan chan) | ||
143 | case e of | ||
144 | Left cmd -> cacheCmd cmd cached >> loop | ||
145 | Right sock -> return sock | ||
146 | |||
147 | retry <- do | ||
148 | (cache,snk) <- liftIO $ do | ||
149 | h <- socketToHandle sock ReadWriteMode | ||
150 | hSetBuffering h NoBuffering | ||
151 | cache <- readIORef $ cached | ||
152 | -- hint garbage collector: we're done with this... | ||
153 | writeIORef cached emptyCache | ||
154 | return (cache,packetSink h) | ||
155 | MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk | ||
156 | |||
157 | liftIO $ cacheCmd retry cached | ||
158 | -- liftIO $ putStrLn $ "retrying " ++ show retry | ||
159 | sendmsgs | ||
160 | |||
161 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | ||
162 | connect' addr port = do | ||
163 | proto <- getProtocolNumber "tcp" | ||
164 | let getport (SockAddrInet port _) = port | ||
165 | getport (SockAddrInet6 port _ _ _) = port | ||
166 | let doException e = do | ||
167 | -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | ||
168 | return Nothing | ||
169 | handleIO doException | ||
170 | $ tryToConnect proto (addr `withPort` port) | ||
171 | where | ||
172 | tryToConnect proto addr = | ||
173 | bracketOnError | ||
174 | (socket (socketFamily addr) Stream proto) | ||
175 | (sClose ) -- only done if there's an error | ||
176 | (\sock -> do | ||
177 | connect sock addr | ||
178 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
179 | ) | ||
180 | |||
181 | |||
182 | type OutBoundXML sock cache msg = | ||
183 | sock | ||
184 | -> cache | ||
185 | -> TChan msg | ||
186 | -> (Maybe msg -> IO ()) | ||
187 | -> Source IO [XML.Event] | ||
188 | |||
189 | handleOutgoingToPeer | ||
190 | :: SocketLike sock => | ||
191 | OutBoundXML sock cache msg | ||
192 | -> sock | ||
193 | -> cache | ||
194 | -> TChan msg | ||
195 | -> ByteStringSink | ||
196 | -> IO (Maybe msg) | ||
197 | handleOutgoingToPeer toPeer sock cache chan snk = do | ||
198 | p <- getPeerName sock | ||
199 | -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) | ||
200 | failed <- newIORef Nothing | ||
201 | let failure cmd = do | ||
202 | writeIORef failed cmd | ||
203 | -- putStrLn $ "Failed: " ++ show cmd | ||
204 | finally ( | ||
205 | handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk | ||
206 | ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) | ||
207 | readIORef failed | ||
208 | |||
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index a469c08e..339cc6f2 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -1,6 +1,7 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | 1 | {-# LANGUAGE OverloadedStrings #-} |
2 | {-# LANGUAGE FlexibleContexts #-} | 2 | {-# LANGUAGE FlexibleContexts #-} |
3 | {-# LANGUAGE ViewPatterns #-} | 3 | {-# LANGUAGE ViewPatterns #-} |
4 | {-# LANGUAGE TypeFamilies #-} | ||
4 | module XMPP | 5 | module XMPP |
5 | ( module XMPPTypes | 6 | ( module XMPPTypes |
6 | , listenForXmppClients | 7 | , listenForXmppClients |
@@ -9,6 +10,10 @@ module XMPP | |||
9 | , seekRemotePeers | 10 | , seekRemotePeers |
10 | , quitListening | 11 | , quitListening |
11 | , OutBoundMessage(..) | 12 | , OutBoundMessage(..) |
13 | , OutgoingConnections | ||
14 | , CachedMessages | ||
15 | , toPeer | ||
16 | , newOutgoingConnections | ||
12 | , sendMessage | 17 | , sendMessage |
13 | ) where | 18 | ) where |
14 | 19 | ||
@@ -17,14 +22,13 @@ import XMPPTypes | |||
17 | import ByteStringOperators | 22 | import ByteStringOperators |
18 | import ControlMaybe | 23 | import ControlMaybe |
19 | import XMLToByteStrings | 24 | import XMLToByteStrings |
20 | 25 | import SendMessage | |
21 | 26 | ||
22 | import Data.Maybe (catMaybes) | 27 | import Data.Maybe (catMaybes) |
23 | import Data.HList | 28 | import Data.HList |
24 | import Network.Socket | 29 | import Network.Socket |
25 | ( Family | 30 | ( Family |
26 | , connect | 31 | , connect |
27 | , socketToHandle | ||
28 | , sClose | 32 | , sClose |
29 | , Socket(..) | 33 | , Socket(..) |
30 | , socket | 34 | , socket |
@@ -33,11 +37,6 @@ import Network.Socket | |||
33 | import Network.BSD | 37 | import Network.BSD |
34 | ( getProtocolNumber | 38 | ( getProtocolNumber |
35 | ) | 39 | ) |
36 | import System.IO | ||
37 | ( BufferMode(..) | ||
38 | , IOMode(..) | ||
39 | , hSetBuffering | ||
40 | ) | ||
41 | import Control.Concurrent.STM | 40 | import Control.Concurrent.STM |
42 | import Data.Conduit | 41 | import Data.Conduit |
43 | import Data.ByteString (ByteString) | 42 | import Data.ByteString (ByteString) |
@@ -45,7 +44,6 @@ import qualified Data.ByteString.Lazy.Char8 as L | |||
45 | ( putStrLn | 44 | ( putStrLn |
46 | , fromChunks | 45 | , fromChunks |
47 | ) | 46 | ) |
48 | import Control.Concurrent (forkIO) | ||
49 | import Control.Concurrent.Async | 47 | import Control.Concurrent.Async |
50 | import Control.Exception as E | 48 | import Control.Exception as E |
51 | ( handle | 49 | ( handle |
@@ -72,11 +70,6 @@ import Data.Set as Set (Set,(\\)) | |||
72 | import qualified Data.Set as Set | 70 | import qualified Data.Set as Set |
73 | import qualified Data.Map as Map | 71 | import qualified Data.Map as Map |
74 | import Data.Map as Map (Map) | 72 | import Data.Map as Map (Map) |
75 | import GHC.Conc | ||
76 | ( threadStatus | ||
77 | , ThreadStatus(..) | ||
78 | , ThreadId | ||
79 | ) | ||
80 | 73 | ||
81 | textToByteString x = L.fromChunks [S.encodeUtf8 x] | 74 | textToByteString x = L.fromChunks [S.encodeUtf8 x] |
82 | 75 | ||
@@ -914,65 +907,28 @@ data CachedMessages = CachedMessages | |||
914 | , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe | 907 | , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe |
915 | , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval | 908 | , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval |
916 | } | 909 | } |
917 | newCache = CachedMessages Map.empty Map.empty Map.empty | 910 | |
918 | 911 | instance CommandCache CachedMessages where | |
919 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | 912 | type CacheableCommand CachedMessages = OutBoundMessage |
920 | let port = 5269 :: Int | 913 | emptyCache = CachedMessages Map.empty Map.empty Map.empty |
921 | -- We'll cache Presence notifications until the socket | 914 | |
922 | -- is ready. | 915 | updateCache (OutBoundPresence (Presence jid Offline)) cache = |
923 | cached <- liftIO $ newIORef newCache | 916 | cache { presences=Map.delete jid . presences $ cache } |
924 | 917 | updateCache (OutBoundPresence p@(Presence jid st)) cache = | |
925 | let mmInsert val key mm = Map.alter f key mm | 918 | cache { presences=Map.insert jid st . presences $ cache } |
926 | where | 919 | updateCache (PresenceProbe from to) cache = |
927 | f Nothing = Just $ Set.singleton val | 920 | cache { probes = mmInsert (True,from) to $ probes cache } |
928 | f (Just set) = Just $ Set.insert val set | 921 | updateCache (Solicitation from to) cache = |
929 | let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do | 922 | cache { probes= mmInsert (False,from) to $ probes cache } |
930 | cache <- readIORef cached | 923 | updateCache (Approval from to) cache = |
931 | writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) | 924 | cache { approvals= mmInsert (True,from) to $ approvals cache } |
932 | cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do | 925 | updateCache (Rejection from to) cache = |
933 | cache <- readIORef cached | 926 | cache { approvals= mmInsert (False,from) to $ approvals cache } |
934 | writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) | 927 | |
935 | cacheCmd (PresenceProbe from to) cached = do | 928 | mmInsert val key mm = Map.alter f key mm |
936 | cache <- readIORef cached | 929 | where |
937 | let probes' = mmInsert (True,from) to $ probes cache | 930 | f Nothing = Just $ Set.singleton val |
938 | writeIORef cached (cache { probes=probes' }) | 931 | f (Just set) = Just $ Set.insert val set |
939 | cacheCmd (Solicitation from to) cached = do | ||
940 | cache <- readIORef cached | ||
941 | let probes' = mmInsert (False,from) to $ probes cache | ||
942 | writeIORef cached (cache { probes=probes' }) | ||
943 | cacheCmd (Approval from to) cached = do | ||
944 | cache <- readIORef cached | ||
945 | let approvals' = mmInsert (True,from) to $ approvals cache | ||
946 | writeIORef cached (cache { approvals=approvals' }) | ||
947 | cacheCmd (Rejection from to) cached = do | ||
948 | cache <- readIORef cached | ||
949 | let approvals' = mmInsert (False,from) to $ approvals cache | ||
950 | writeIORef cached (cache { approvals=approvals' }) | ||
951 | |||
952 | fix $ \sendmsgs -> do | ||
953 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
954 | |||
955 | sock <- MaybeT . fix $ \loop -> do | ||
956 | e <- atomically $ orElse | ||
957 | (fmap Right $ waitSTM connected) | ||
958 | (fmap Left $ readTChan chan) | ||
959 | case e of | ||
960 | Left cmd -> cacheCmd cmd cached >> loop | ||
961 | Right sock -> return sock | ||
962 | |||
963 | retry <- do | ||
964 | (cache,snk) <- liftIO $ do | ||
965 | h <- socketToHandle sock ReadWriteMode | ||
966 | hSetBuffering h NoBuffering | ||
967 | cache <- readIORef $ cached | ||
968 | -- hint garbage collector: we're done with this... | ||
969 | writeIORef cached newCache | ||
970 | return (cache,packetSink h) | ||
971 | MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk | ||
972 | |||
973 | liftIO $ cacheCmd retry cached | ||
974 | liftIO $ putStrLn $ "retrying " ++ show retry | ||
975 | sendmsgs | ||
976 | 932 | ||
977 | 933 | ||
978 | greetPeer = | 934 | greetPeer = |
@@ -1139,6 +1095,7 @@ connect' addr port = do | |||
1139 | ) | 1095 | ) |
1140 | 1096 | ||
1141 | 1097 | ||
1098 | {- | ||
1142 | sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () | 1099 | sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () |
1143 | sendMessage cons msg peer0 = do | 1100 | sendMessage cons msg peer0 = do |
1144 | let peer = discardPort peer0 | 1101 | let peer = discardPort peer0 |
@@ -1170,11 +1127,12 @@ sendMessage cons msg peer0 = do | |||
1170 | atomically $ writeTChan (fst entry) msg | 1127 | atomically $ writeTChan (fst entry) msg |
1171 | when is_new . atomically $ | 1128 | when is_new . atomically $ |
1172 | readTVar cons >>= writeTVar cons . Map.insert peer entry | 1129 | readTVar cons >>= writeTVar cons . Map.insert peer entry |
1130 | -} | ||
1173 | 1131 | ||
1174 | 1132 | ||
1175 | 1133 | ||
1176 | seekRemotePeers :: JabberPeerSession config => | 1134 | seekRemotePeers :: JabberPeerSession config => |
1177 | XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0 | 1135 | XMPPPeerClass config -> TChan Presence -> OutgoingConnections CachedMessages -> IO b0 |
1178 | seekRemotePeers config chan server_connections = do | 1136 | seekRemotePeers config chan server_connections = do |
1179 | fix $ \loop -> do | 1137 | fix $ \loop -> do |
1180 | event <- atomically $ readTChan chan | 1138 | event <- atomically $ readTChan chan |
diff --git a/Presence/main.hs b/Presence/main.hs index 317dc71e..3d5ac046 100644 --- a/Presence/main.hs +++ b/Presence/main.hs | |||
@@ -89,7 +89,7 @@ data PresenceState = PresenceState | |||
89 | , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow)))) | 89 | , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow)))) |
90 | 90 | ||
91 | -- outGoingConnections - a set of channels that may be used to send messages to remote peers. | 91 | -- outGoingConnections - a set of channels that may be used to send messages to remote peers. |
92 | , outGoingConnections :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) | 92 | , outGoingConnections :: OutgoingConnections CachedMessages |
93 | } | 93 | } |
94 | 94 | ||
95 | 95 | ||
@@ -108,7 +108,7 @@ newPresenceState hostname = atomically $ do | |||
108 | locals_greedy <- newEmptyTMVar | 108 | locals_greedy <- newEmptyTMVar |
109 | rchan <- newEmptyTMVar | 109 | rchan <- newEmptyTMVar |
110 | remotes <- newTVar (Map.empty) | 110 | remotes <- newTVar (Map.empty) |
111 | server_connections <- newServerConnections | 111 | server_connections <- newOutgoingConnections toPeer |
112 | return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections | 112 | return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections |
113 | 113 | ||
114 | 114 | ||