diff options
author | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-07-14 03:36:43 -0400 |
commit | a400eb11dcd5d8e3a94f2d3505a6b66c63cf54f5 (patch) | |
tree | bbf56c74adaf35d014c73f718424a3a7d864a5ed | |
parent | 36339ba1369717858639299a323bd0dac7f95c51 (diff) |
Factored the cache and retry pattern into module SendMessage
-rw-r--r-- | Presence/SendMessage.hs (renamed from SendMessage.hs) | 0 | ||||
-rw-r--r-- | Presence/XMPP.hs | 104 | ||||
-rw-r--r-- | Presence/main.hs | 4 |
3 files changed, 33 insertions, 75 deletions
diff --git a/SendMessage.hs b/Presence/SendMessage.hs index 6e8ea2b9..6e8ea2b9 100644 --- a/SendMessage.hs +++ b/Presence/SendMessage.hs | |||
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index a469c08e..339cc6f2 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -1,6 +1,7 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | 1 | {-# LANGUAGE OverloadedStrings #-} |
2 | {-# LANGUAGE FlexibleContexts #-} | 2 | {-# LANGUAGE FlexibleContexts #-} |
3 | {-# LANGUAGE ViewPatterns #-} | 3 | {-# LANGUAGE ViewPatterns #-} |
4 | {-# LANGUAGE TypeFamilies #-} | ||
4 | module XMPP | 5 | module XMPP |
5 | ( module XMPPTypes | 6 | ( module XMPPTypes |
6 | , listenForXmppClients | 7 | , listenForXmppClients |
@@ -9,6 +10,10 @@ module XMPP | |||
9 | , seekRemotePeers | 10 | , seekRemotePeers |
10 | , quitListening | 11 | , quitListening |
11 | , OutBoundMessage(..) | 12 | , OutBoundMessage(..) |
13 | , OutgoingConnections | ||
14 | , CachedMessages | ||
15 | , toPeer | ||
16 | , newOutgoingConnections | ||
12 | , sendMessage | 17 | , sendMessage |
13 | ) where | 18 | ) where |
14 | 19 | ||
@@ -17,14 +22,13 @@ import XMPPTypes | |||
17 | import ByteStringOperators | 22 | import ByteStringOperators |
18 | import ControlMaybe | 23 | import ControlMaybe |
19 | import XMLToByteStrings | 24 | import XMLToByteStrings |
20 | 25 | import SendMessage | |
21 | 26 | ||
22 | import Data.Maybe (catMaybes) | 27 | import Data.Maybe (catMaybes) |
23 | import Data.HList | 28 | import Data.HList |
24 | import Network.Socket | 29 | import Network.Socket |
25 | ( Family | 30 | ( Family |
26 | , connect | 31 | , connect |
27 | , socketToHandle | ||
28 | , sClose | 32 | , sClose |
29 | , Socket(..) | 33 | , Socket(..) |
30 | , socket | 34 | , socket |
@@ -33,11 +37,6 @@ import Network.Socket | |||
33 | import Network.BSD | 37 | import Network.BSD |
34 | ( getProtocolNumber | 38 | ( getProtocolNumber |
35 | ) | 39 | ) |
36 | import System.IO | ||
37 | ( BufferMode(..) | ||
38 | , IOMode(..) | ||
39 | , hSetBuffering | ||
40 | ) | ||
41 | import Control.Concurrent.STM | 40 | import Control.Concurrent.STM |
42 | import Data.Conduit | 41 | import Data.Conduit |
43 | import Data.ByteString (ByteString) | 42 | import Data.ByteString (ByteString) |
@@ -45,7 +44,6 @@ import qualified Data.ByteString.Lazy.Char8 as L | |||
45 | ( putStrLn | 44 | ( putStrLn |
46 | , fromChunks | 45 | , fromChunks |
47 | ) | 46 | ) |
48 | import Control.Concurrent (forkIO) | ||
49 | import Control.Concurrent.Async | 47 | import Control.Concurrent.Async |
50 | import Control.Exception as E | 48 | import Control.Exception as E |
51 | ( handle | 49 | ( handle |
@@ -72,11 +70,6 @@ import Data.Set as Set (Set,(\\)) | |||
72 | import qualified Data.Set as Set | 70 | import qualified Data.Set as Set |
73 | import qualified Data.Map as Map | 71 | import qualified Data.Map as Map |
74 | import Data.Map as Map (Map) | 72 | import Data.Map as Map (Map) |
75 | import GHC.Conc | ||
76 | ( threadStatus | ||
77 | , ThreadStatus(..) | ||
78 | , ThreadId | ||
79 | ) | ||
80 | 73 | ||
81 | textToByteString x = L.fromChunks [S.encodeUtf8 x] | 74 | textToByteString x = L.fromChunks [S.encodeUtf8 x] |
82 | 75 | ||
@@ -914,65 +907,28 @@ data CachedMessages = CachedMessages | |||
914 | , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe | 907 | , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe |
915 | , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval | 908 | , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval |
916 | } | 909 | } |
917 | newCache = CachedMessages Map.empty Map.empty Map.empty | 910 | |
918 | 911 | instance CommandCache CachedMessages where | |
919 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | 912 | type CacheableCommand CachedMessages = OutBoundMessage |
920 | let port = 5269 :: Int | 913 | emptyCache = CachedMessages Map.empty Map.empty Map.empty |
921 | -- We'll cache Presence notifications until the socket | 914 | |
922 | -- is ready. | 915 | updateCache (OutBoundPresence (Presence jid Offline)) cache = |
923 | cached <- liftIO $ newIORef newCache | 916 | cache { presences=Map.delete jid . presences $ cache } |
924 | 917 | updateCache (OutBoundPresence p@(Presence jid st)) cache = | |
925 | let mmInsert val key mm = Map.alter f key mm | 918 | cache { presences=Map.insert jid st . presences $ cache } |
926 | where | 919 | updateCache (PresenceProbe from to) cache = |
927 | f Nothing = Just $ Set.singleton val | 920 | cache { probes = mmInsert (True,from) to $ probes cache } |
928 | f (Just set) = Just $ Set.insert val set | 921 | updateCache (Solicitation from to) cache = |
929 | let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do | 922 | cache { probes= mmInsert (False,from) to $ probes cache } |
930 | cache <- readIORef cached | 923 | updateCache (Approval from to) cache = |
931 | writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) | 924 | cache { approvals= mmInsert (True,from) to $ approvals cache } |
932 | cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do | 925 | updateCache (Rejection from to) cache = |
933 | cache <- readIORef cached | 926 | cache { approvals= mmInsert (False,from) to $ approvals cache } |
934 | writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) | 927 | |
935 | cacheCmd (PresenceProbe from to) cached = do | 928 | mmInsert val key mm = Map.alter f key mm |
936 | cache <- readIORef cached | 929 | where |
937 | let probes' = mmInsert (True,from) to $ probes cache | 930 | f Nothing = Just $ Set.singleton val |
938 | writeIORef cached (cache { probes=probes' }) | 931 | f (Just set) = Just $ Set.insert val set |
939 | cacheCmd (Solicitation from to) cached = do | ||
940 | cache <- readIORef cached | ||
941 | let probes' = mmInsert (False,from) to $ probes cache | ||
942 | writeIORef cached (cache { probes=probes' }) | ||
943 | cacheCmd (Approval from to) cached = do | ||
944 | cache <- readIORef cached | ||
945 | let approvals' = mmInsert (True,from) to $ approvals cache | ||
946 | writeIORef cached (cache { approvals=approvals' }) | ||
947 | cacheCmd (Rejection from to) cached = do | ||
948 | cache <- readIORef cached | ||
949 | let approvals' = mmInsert (False,from) to $ approvals cache | ||
950 | writeIORef cached (cache { approvals=approvals' }) | ||
951 | |||
952 | fix $ \sendmsgs -> do | ||
953 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
954 | |||
955 | sock <- MaybeT . fix $ \loop -> do | ||
956 | e <- atomically $ orElse | ||
957 | (fmap Right $ waitSTM connected) | ||
958 | (fmap Left $ readTChan chan) | ||
959 | case e of | ||
960 | Left cmd -> cacheCmd cmd cached >> loop | ||
961 | Right sock -> return sock | ||
962 | |||
963 | retry <- do | ||
964 | (cache,snk) <- liftIO $ do | ||
965 | h <- socketToHandle sock ReadWriteMode | ||
966 | hSetBuffering h NoBuffering | ||
967 | cache <- readIORef $ cached | ||
968 | -- hint garbage collector: we're done with this... | ||
969 | writeIORef cached newCache | ||
970 | return (cache,packetSink h) | ||
971 | MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk | ||
972 | |||
973 | liftIO $ cacheCmd retry cached | ||
974 | liftIO $ putStrLn $ "retrying " ++ show retry | ||
975 | sendmsgs | ||
976 | 932 | ||
977 | 933 | ||
978 | greetPeer = | 934 | greetPeer = |
@@ -1139,6 +1095,7 @@ connect' addr port = do | |||
1139 | ) | 1095 | ) |
1140 | 1096 | ||
1141 | 1097 | ||
1098 | {- | ||
1142 | sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () | 1099 | sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () |
1143 | sendMessage cons msg peer0 = do | 1100 | sendMessage cons msg peer0 = do |
1144 | let peer = discardPort peer0 | 1101 | let peer = discardPort peer0 |
@@ -1170,11 +1127,12 @@ sendMessage cons msg peer0 = do | |||
1170 | atomically $ writeTChan (fst entry) msg | 1127 | atomically $ writeTChan (fst entry) msg |
1171 | when is_new . atomically $ | 1128 | when is_new . atomically $ |
1172 | readTVar cons >>= writeTVar cons . Map.insert peer entry | 1129 | readTVar cons >>= writeTVar cons . Map.insert peer entry |
1130 | -} | ||
1173 | 1131 | ||
1174 | 1132 | ||
1175 | 1133 | ||
1176 | seekRemotePeers :: JabberPeerSession config => | 1134 | seekRemotePeers :: JabberPeerSession config => |
1177 | XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0 | 1135 | XMPPPeerClass config -> TChan Presence -> OutgoingConnections CachedMessages -> IO b0 |
1178 | seekRemotePeers config chan server_connections = do | 1136 | seekRemotePeers config chan server_connections = do |
1179 | fix $ \loop -> do | 1137 | fix $ \loop -> do |
1180 | event <- atomically $ readTChan chan | 1138 | event <- atomically $ readTChan chan |
diff --git a/Presence/main.hs b/Presence/main.hs index 317dc71e..3d5ac046 100644 --- a/Presence/main.hs +++ b/Presence/main.hs | |||
@@ -89,7 +89,7 @@ data PresenceState = PresenceState | |||
89 | , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow)))) | 89 | , remoteUsers :: TVar (Map Peer (RefCount,TVar (MultiMap JabberUser (JabberResource,JabberShow)))) |
90 | 90 | ||
91 | -- outGoingConnections - a set of channels that may be used to send messages to remote peers. | 91 | -- outGoingConnections - a set of channels that may be used to send messages to remote peers. |
92 | , outGoingConnections :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) | 92 | , outGoingConnections :: OutgoingConnections CachedMessages |
93 | } | 93 | } |
94 | 94 | ||
95 | 95 | ||
@@ -108,7 +108,7 @@ newPresenceState hostname = atomically $ do | |||
108 | locals_greedy <- newEmptyTMVar | 108 | locals_greedy <- newEmptyTMVar |
109 | rchan <- newEmptyTMVar | 109 | rchan <- newEmptyTMVar |
110 | remotes <- newTVar (Map.empty) | 110 | remotes <- newTVar (Map.empty) |
111 | server_connections <- newServerConnections | 111 | server_connections <- newOutgoingConnections toPeer |
112 | return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections | 112 | return $ PresenceState hostname tty us subs locals_greedy rchan remotes server_connections |
113 | 113 | ||
114 | 114 | ||