summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2013-07-14 03:36:43 -0400
committerjoe <joe@jerkface.net>2013-07-14 03:36:43 -0400
commita400eb11dcd5d8e3a94f2d3505a6b66c63cf54f5 (patch)
treebbf56c74adaf35d014c73f718424a3a7d864a5ed /Presence
parent36339ba1369717858639299a323bd0dac7f95c51 (diff)
Factored the cache and retry pattern into module SendMessage
Diffstat (limited to 'Presence')
-rw-r--r--Presence/SendMessage.hs208
-rw-r--r--Presence/XMPP.hs104
-rw-r--r--Presence/main.hs4
3 files changed, 241 insertions, 75 deletions
diff --git a/Presence/SendMessage.hs b/Presence/SendMessage.hs
new file mode 100644
index 00000000..6e8ea2b9
--- /dev/null
+++ b/Presence/SendMessage.hs
@@ -0,0 +1,208 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-}
3module SendMessage
4 ( sendMessage
5 , CommandCache(..)
6 , newOutgoingConnections
7 , OutgoingConnections
8 ) where
9
10import Control.Monad
11import Control.Monad.Fix
12import Control.Monad.IO.Class
13import Control.Monad.Trans.Maybe
14import Control.Concurrent.STM
15import Control.Concurrent (forkIO)
16import Control.Concurrent.Async (async,waitSTM)
17import Control.Exception as E
18 ( bracketOnError
19 , finally
20 )
21import GHC.Conc
22 ( threadStatus
23 , ThreadStatus(..)
24 , ThreadId
25 )
26import Data.IORef
27import qualified Data.Map as Map
28import Data.Map as Map (Map)
29import Data.XML.Types as XML (Event)
30import System.IO
31 ( BufferMode(..)
32 , IOMode(..)
33 , hSetBuffering
34 )
35import Network.BSD
36 ( getProtocolNumber
37 )
38import Network.Socket
39 ( connect
40 , socketToHandle
41 , sClose
42 , Socket(..)
43 , socket
44 , SocketType(..)
45 )
46
47import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
48import SocketLike
49import ServerC (packetSink)
50import ControlMaybe (handleIO,handleIO_)
51import Data.Conduit (Sink,Source)
52import qualified Data.ByteString as S (ByteString)
53import XMLToByteStrings
54
55type ByteStringSink = Sink S.ByteString IO ()
56
57-- |Strict version of 'modifyIORef'
58modifyIORef' :: IORef a -> (a -> a) -> IO ()
59modifyIORef' ref f = do
60 x <- readIORef ref
61 let x' = f x
62 x' `seq` writeIORef ref x'
63
64
65class CommandCache cache where
66 type CacheableCommand cache
67
68 emptyCache :: cache
69
70 updateCache :: CacheableCommand cache -> cache -> cache
71
72
73data OutgoingConnections cache =
74 OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId)))
75 (OutBoundXML RestrictedSocket cache (CacheableCommand cache))
76
77
78newOutgoingConnections ::
79 ( RestrictedSocket
80 -> cache
81 -> TChan (CacheableCommand cache)
82 -> (Maybe (CacheableCommand cache) -> IO ())
83 -> Source IO [XML.Event] )
84
85 -> STM (OutgoingConnections cache)
86
87newOutgoingConnections interpretCommands = do
88 remotes <- newTVar (Map.empty)
89 return (OutgoingConnections remotes interpretCommands)
90
91
92
93sendMessage
94 :: CommandCache a =>
95 OutgoingConnections a -> CacheableCommand a -> Peer -> IO ()
96sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do
97 let peer = discardPort peer0
98 found <- atomically $ do
99 consmap <- readTVar cons
100 return (Map.lookup peer consmap)
101 let newEntry = do
102 chan <- atomically newTChan
103 t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer)
104 -- L.putStrLn $ "remote-map new: " <++> showPeer peer
105 return (True,(chan,t))
106 (is_new,entry) <- maybe newEntry
107 ( \(chan,t) -> do
108 st <- threadStatus t
109 let running = do
110 -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
111 return (False,(chan,t))
112 died = do
113 -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
114 newEntry
115 case st of
116 ThreadRunning -> running
117 ThreadBlocked _ -> running
118 ThreadDied -> died
119 ThreadFinished -> died
120 )
121 found
122 -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
123 atomically $ writeTChan (fst entry) msg
124 when is_new . atomically $
125 readTVar cons >>= writeTVar cons . Map.insert peer entry
126
127
128connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do
129 let port = 5269 :: Int
130 -- We'll cache Presence notifications until the socket
131 -- is ready.
132 cached <- liftIO $ newIORef emptyCache
133
134 let cacheCmd msg cached = modifyIORef' cached (updateCache msg)
135
136 fix $ \sendmsgs -> do
137 connected <- liftIO . async $ connect' (peerAddr peer) port
138
139 sock <- MaybeT . fix $ \loop -> do
140 e <- atomically $ orElse
141 (fmap Right $ waitSTM connected)
142 (fmap Left $ readTChan chan)
143 case e of
144 Left cmd -> cacheCmd cmd cached >> loop
145 Right sock -> return sock
146
147 retry <- do
148 (cache,snk) <- liftIO $ do
149 h <- socketToHandle sock ReadWriteMode
150 hSetBuffering h NoBuffering
151 cache <- readIORef $ cached
152 -- hint garbage collector: we're done with this...
153 writeIORef cached emptyCache
154 return (cache,packetSink h)
155 MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk
156
157 liftIO $ cacheCmd retry cached
158 -- liftIO $ putStrLn $ "retrying " ++ show retry
159 sendmsgs
160
161connect' :: SockAddr -> Int -> IO (Maybe Socket)
162connect' addr port = do
163 proto <- getProtocolNumber "tcp"
164 let getport (SockAddrInet port _) = port
165 getport (SockAddrInet6 port _ _ _) = port
166 let doException e = do
167 -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
168 return Nothing
169 handleIO doException
170 $ tryToConnect proto (addr `withPort` port)
171 where
172 tryToConnect proto addr =
173 bracketOnError
174 (socket (socketFamily addr) Stream proto)
175 (sClose ) -- only done if there's an error
176 (\sock -> do
177 connect sock addr
178 return (Just sock) -- socketToHandle sock ReadWriteMode
179 )
180
181
182type OutBoundXML sock cache msg =
183 sock
184 -> cache
185 -> TChan msg
186 -> (Maybe msg -> IO ())
187 -> Source IO [XML.Event]
188
189handleOutgoingToPeer
190 :: SocketLike sock =>
191 OutBoundXML sock cache msg
192 -> sock
193 -> cache
194 -> TChan msg
195 -> ByteStringSink
196 -> IO (Maybe msg)
197handleOutgoingToPeer toPeer sock cache chan snk = do
198 p <- getPeerName sock
199 -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
200 failed <- newIORef Nothing
201 let failure cmd = do
202 writeIORef failed cmd
203 -- putStrLn $ "Failed: " ++ show cmd
204 finally (
205 handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
206 ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
207 readIORef failed
208
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs
index a469c08e..339cc6f2 100644
--- a/Presence/XMPP.hs
+++ b/Presence/XMPP.hs
@@ -1,6 +1,7 @@
1{-# LANGUAGE OverloadedStrings #-} 1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE FlexibleContexts #-} 2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE ViewPatterns #-} 3{-# LANGUAGE ViewPatterns #-}
4{-# LANGUAGE TypeFamilies #-}
4module XMPP 5module XMPP
5 ( module XMPPTypes 6 ( module XMPPTypes
6 , listenForXmppClients 7 , listenForXmppClients
@@ -9,6 +10,10 @@ module XMPP
9 , seekRemotePeers 10 , seekRemotePeers
10 , quitListening 11 , quitListening
11 , OutBoundMessage(..) 12 , OutBoundMessage(..)
13 , OutgoingConnections
14 , CachedMessages
15 , toPeer
16 , newOutgoingConnections
12 , sendMessage 17 , sendMessage
13 ) where 18 ) where
14 19
@@ -17,14 +22,13 @@ import XMPPTypes
17import ByteStringOperators 22import ByteStringOperators
18import ControlMaybe 23import ControlMaybe
19import XMLToByteStrings 24import XMLToByteStrings
20 25import SendMessage
21 26
22import Data.Maybe (catMaybes) 27import Data.Maybe (catMaybes)
23import Data.HList 28import Data.HList
24import Network.Socket 29import Network.Socket
25 ( Family 30 ( Family
26 , connect 31 , connect
27 , socketToHandle
28 , sClose 32 , sClose
29 , Socket(..) 33 , Socket(..)
30 , socket 34 , socket
@@ -33,11 +37,6 @@ import Network.Socket
33import Network.BSD 37import Network.BSD
34 ( getProtocolNumber 38 ( getProtocolNumber
35 ) 39 )
36import System.IO
37 ( BufferMode(..)
38 , IOMode(..)
39 , hSetBuffering
40 )
41import Control.Concurrent.STM 40import Control.Concurrent.STM
42import Data.Conduit 41import Data.Conduit
43import Data.ByteString (ByteString) 42import Data.ByteString (ByteString)
@@ -45,7 +44,6 @@ import qualified Data.ByteString.Lazy.Char8 as L
45 ( putStrLn 44 ( putStrLn
46 , fromChunks 45 , fromChunks
47 ) 46 )
48import Control.Concurrent (forkIO)
49import Control.Concurrent.Async 47import Control.Concurrent.Async
50import Control.Exception as E 48import Control.Exception as E
51 ( handle 49 ( handle
@@ -72,11 +70,6 @@ import Data.Set as Set (Set,(\\))
72import qualified Data.Set as Set 70import qualified Data.Set as Set
73import qualified Data.Map as Map 71import qualified Data.Map as Map
74import Data.Map as Map (Map) 72import Data.Map as Map (Map)
75import GHC.Conc
76 ( threadStatus
77 , ThreadStatus(..)
78 , ThreadId
79 )
80 73
81textToByteString x = L.fromChunks [S.encodeUtf8 x] 74textToByteString x = L.fromChunks [S.encodeUtf8 x]
82 75
@@ -914,65 +907,28 @@ data CachedMessages = CachedMessages
914 , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe 907 , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe
915 , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval 908 , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval
916 } 909 }
917newCache = CachedMessages Map.empty Map.empty Map.empty 910
918 911instance CommandCache CachedMessages where
919connect_to_server chan peer = (>> return ()) . runMaybeT $ do 912 type CacheableCommand CachedMessages = OutBoundMessage
920 let port = 5269 :: Int 913 emptyCache = CachedMessages Map.empty Map.empty Map.empty
921 -- We'll cache Presence notifications until the socket 914
922 -- is ready. 915 updateCache (OutBoundPresence (Presence jid Offline)) cache =
923 cached <- liftIO $ newIORef newCache 916 cache { presences=Map.delete jid . presences $ cache }
924 917 updateCache (OutBoundPresence p@(Presence jid st)) cache =
925 let mmInsert val key mm = Map.alter f key mm 918 cache { presences=Map.insert jid st . presences $ cache }
926 where 919 updateCache (PresenceProbe from to) cache =
927 f Nothing = Just $ Set.singleton val 920 cache { probes = mmInsert (True,from) to $ probes cache }
928 f (Just set) = Just $ Set.insert val set 921 updateCache (Solicitation from to) cache =
929 let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do 922 cache { probes= mmInsert (False,from) to $ probes cache }
930 cache <- readIORef cached 923 updateCache (Approval from to) cache =
931 writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) 924 cache { approvals= mmInsert (True,from) to $ approvals cache }
932 cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do 925 updateCache (Rejection from to) cache =
933 cache <- readIORef cached 926 cache { approvals= mmInsert (False,from) to $ approvals cache }
934 writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) 927
935 cacheCmd (PresenceProbe from to) cached = do 928mmInsert val key mm = Map.alter f key mm
936 cache <- readIORef cached 929 where
937 let probes' = mmInsert (True,from) to $ probes cache 930 f Nothing = Just $ Set.singleton val
938 writeIORef cached (cache { probes=probes' }) 931 f (Just set) = Just $ Set.insert val set
939 cacheCmd (Solicitation from to) cached = do
940 cache <- readIORef cached
941 let probes' = mmInsert (False,from) to $ probes cache
942 writeIORef cached (cache { probes=probes' })
943 cacheCmd (Approval from to) cached = do
944 cache <- readIORef cached
945 let approvals' = mmInsert (True,from) to $ approvals cache
946 writeIORef cached (cache { approvals=approvals' })
947 cacheCmd (Rejection from to) cached = do
948 cache <- readIORef cached
949 let approvals' = mmInsert (False,from) to $ approvals cache
950 writeIORef cached (cache { approvals=approvals' })
951
952 fix $ \sendmsgs -> do
953 connected <- liftIO . async $ connect' (peerAddr peer) port
954
955 sock <- MaybeT . fix $ \loop -> do
956 e <- atomically $ orElse
957 (fmap Right $ waitSTM connected)
958 (fmap Left $ readTChan chan)
959 case e of
960 Left cmd -> cacheCmd cmd cached >> loop
961 Right sock -> return sock
962
963 retry <- do
964 (cache,snk) <- liftIO $ do
965 h <- socketToHandle sock ReadWriteMode
966 hSetBuffering h NoBuffering
967 cache <- readIORef $ cached
968 -- hint garbage collector: we're done with this...
969 writeIORef cached newCache
970 return (cache,packetSink h)
971 MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk
972
973 liftIO $ cacheCmd retry cached
974 liftIO $ putStrLn $ "retrying " ++ show retry
975 sendmsgs
976 932
977 933
978greetPeer = 934greetPeer =
@@ -1139,6 +1095,7 @@ connect' addr port = do
1139 ) 1095 )
1140 1096
1141 1097
1098{-
1142sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () 1099sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO ()
1143sendMessage cons msg peer0 = do 1100sendMessage cons msg peer0 = do
1144 let peer = discardPort peer0 1101 let peer = discardPort peer0
@@ -1170,11 +1127,12 @@ sendMessage cons msg peer0 = do
1170 atomically $ writeTChan (fst entry) msg 1127 atomically $ writeTChan (fst entry) msg
1171 when is_new . atomically $ 1128 when is_new . atomically $
1172 readTVar cons >>= writeTVar cons . Map.insert peer entry 1129 readTVar cons >>= writeTVar cons . Map.insert peer entry
1130-}
1173 1131
1174 1132
1175 1133
1176seekRemotePeers :: JabberPeerSession config => 1134seekRemotePeers :: JabberPeerSession config =>
1177 XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0 1135 XMPPPeerClass config -> TChan Presence -> OutgoingConnections CachedMessages -> IO b0
1178seekRemotePeers config chan server_connections = do 1136seekRemotePeers config chan server_connections = do
1179 fix $ \loop -> do 1137 fix $ \loop -> do
1180 event <- atomically $ readTChan chan 1138 event <- atomically $ readTChan chan
diff --git a/Presence/main.hs b/Presence/main.hs
index 317dc71e..3d5ac046 100644
--- a/Presence/main.hs
+++ b/Presence/main.hs
@@ -89,7 +89,7 @@ data PresenceState = PresenceState
89 , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow)))) 89 , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow))))
90 90
91 -- outGoingConnections - a set of channels that may be used to send messages to remote peers. 91 -- outGoingConnections - a set of channels that may be used to send messages to remote peers.
92 , outGoingConnections :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) 92 , outGoingConnections :: OutgoingConnections CachedMessages
93 } 93 }
94 94
95 95
@@ -108,7 +108,7 @@ newPresenceState hostname = atomically $ do
108 locals_greedy <- newEmptyTMVar 108 locals_greedy <- newEmptyTMVar
109 rchan <- newEmptyTMVar 109 rchan <- newEmptyTMVar
110 remotes <- newTVar (Map.empty) 110 remotes <- newTVar (Map.empty)
111 server_connections <- newServerConnections 111 server_connections <- newOutgoingConnections toPeer
112 return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections 112 return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections
113 113
114 114