summaryrefslogtreecommitdiff
path: root/Presence/XMPP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Presence/XMPP.hs')
-rw-r--r--Presence/XMPP.hs104
1 files changed, 31 insertions, 73 deletions
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs
index a469c08e..339cc6f2 100644
--- a/Presence/XMPP.hs
+++ b/Presence/XMPP.hs
@@ -1,6 +1,7 @@
1{-# LANGUAGE OverloadedStrings #-} 1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE FlexibleContexts #-} 2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE ViewPatterns #-} 3{-# LANGUAGE ViewPatterns #-}
4{-# LANGUAGE TypeFamilies #-}
4module XMPP 5module XMPP
5 ( module XMPPTypes 6 ( module XMPPTypes
6 , listenForXmppClients 7 , listenForXmppClients
@@ -9,6 +10,10 @@ module XMPP
9 , seekRemotePeers 10 , seekRemotePeers
10 , quitListening 11 , quitListening
11 , OutBoundMessage(..) 12 , OutBoundMessage(..)
13 , OutgoingConnections
14 , CachedMessages
15 , toPeer
16 , newOutgoingConnections
12 , sendMessage 17 , sendMessage
13 ) where 18 ) where
14 19
@@ -17,14 +22,13 @@ import XMPPTypes
17import ByteStringOperators 22import ByteStringOperators
18import ControlMaybe 23import ControlMaybe
19import XMLToByteStrings 24import XMLToByteStrings
20 25import SendMessage
21 26
22import Data.Maybe (catMaybes) 27import Data.Maybe (catMaybes)
23import Data.HList 28import Data.HList
24import Network.Socket 29import Network.Socket
25 ( Family 30 ( Family
26 , connect 31 , connect
27 , socketToHandle
28 , sClose 32 , sClose
29 , Socket(..) 33 , Socket(..)
30 , socket 34 , socket
@@ -33,11 +37,6 @@ import Network.Socket
33import Network.BSD 37import Network.BSD
34 ( getProtocolNumber 38 ( getProtocolNumber
35 ) 39 )
36import System.IO
37 ( BufferMode(..)
38 , IOMode(..)
39 , hSetBuffering
40 )
41import Control.Concurrent.STM 40import Control.Concurrent.STM
42import Data.Conduit 41import Data.Conduit
43import Data.ByteString (ByteString) 42import Data.ByteString (ByteString)
@@ -45,7 +44,6 @@ import qualified Data.ByteString.Lazy.Char8 as L
45 ( putStrLn 44 ( putStrLn
46 , fromChunks 45 , fromChunks
47 ) 46 )
48import Control.Concurrent (forkIO)
49import Control.Concurrent.Async 47import Control.Concurrent.Async
50import Control.Exception as E 48import Control.Exception as E
51 ( handle 49 ( handle
@@ -72,11 +70,6 @@ import Data.Set as Set (Set,(\\))
72import qualified Data.Set as Set 70import qualified Data.Set as Set
73import qualified Data.Map as Map 71import qualified Data.Map as Map
74import Data.Map as Map (Map) 72import Data.Map as Map (Map)
75import GHC.Conc
76 ( threadStatus
77 , ThreadStatus(..)
78 , ThreadId
79 )
80 73
81textToByteString x = L.fromChunks [S.encodeUtf8 x] 74textToByteString x = L.fromChunks [S.encodeUtf8 x]
82 75
@@ -914,65 +907,28 @@ data CachedMessages = CachedMessages
914 , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe 907 , probes :: Map JID (Set (Bool,JID)) -- False means solicitation rather than probe
915 , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval 908 , approvals :: Map JID (Set (Bool,JID) ) -- False means rejection rather than approval
916 } 909 }
917newCache = CachedMessages Map.empty Map.empty Map.empty 910
918 911instance CommandCache CachedMessages where
919connect_to_server chan peer = (>> return ()) . runMaybeT $ do 912 type CacheableCommand CachedMessages = OutBoundMessage
920 let port = 5269 :: Int 913 emptyCache = CachedMessages Map.empty Map.empty Map.empty
921 -- We'll cache Presence notifications until the socket 914
922 -- is ready. 915 updateCache (OutBoundPresence (Presence jid Offline)) cache =
923 cached <- liftIO $ newIORef newCache 916 cache { presences=Map.delete jid . presences $ cache }
924 917 updateCache (OutBoundPresence p@(Presence jid st)) cache =
925 let mmInsert val key mm = Map.alter f key mm 918 cache { presences=Map.insert jid st . presences $ cache }
926 where 919 updateCache (PresenceProbe from to) cache =
927 f Nothing = Just $ Set.singleton val 920 cache { probes = mmInsert (True,from) to $ probes cache }
928 f (Just set) = Just $ Set.insert val set 921 updateCache (Solicitation from to) cache =
929 let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do 922 cache { probes= mmInsert (False,from) to $ probes cache }
930 cache <- readIORef cached 923 updateCache (Approval from to) cache =
931 writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) 924 cache { approvals= mmInsert (True,from) to $ approvals cache }
932 cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do 925 updateCache (Rejection from to) cache =
933 cache <- readIORef cached 926 cache { approvals= mmInsert (False,from) to $ approvals cache }
934 writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) 927
935 cacheCmd (PresenceProbe from to) cached = do 928mmInsert val key mm = Map.alter f key mm
936 cache <- readIORef cached 929 where
937 let probes' = mmInsert (True,from) to $ probes cache 930 f Nothing = Just $ Set.singleton val
938 writeIORef cached (cache { probes=probes' }) 931 f (Just set) = Just $ Set.insert val set
939 cacheCmd (Solicitation from to) cached = do
940 cache <- readIORef cached
941 let probes' = mmInsert (False,from) to $ probes cache
942 writeIORef cached (cache { probes=probes' })
943 cacheCmd (Approval from to) cached = do
944 cache <- readIORef cached
945 let approvals' = mmInsert (True,from) to $ approvals cache
946 writeIORef cached (cache { approvals=approvals' })
947 cacheCmd (Rejection from to) cached = do
948 cache <- readIORef cached
949 let approvals' = mmInsert (False,from) to $ approvals cache
950 writeIORef cached (cache { approvals=approvals' })
951
952 fix $ \sendmsgs -> do
953 connected <- liftIO . async $ connect' (peerAddr peer) port
954
955 sock <- MaybeT . fix $ \loop -> do
956 e <- atomically $ orElse
957 (fmap Right $ waitSTM connected)
958 (fmap Left $ readTChan chan)
959 case e of
960 Left cmd -> cacheCmd cmd cached >> loop
961 Right sock -> return sock
962
963 retry <- do
964 (cache,snk) <- liftIO $ do
965 h <- socketToHandle sock ReadWriteMode
966 hSetBuffering h NoBuffering
967 cache <- readIORef $ cached
968 -- hint garbage collector: we're done with this...
969 writeIORef cached newCache
970 return (cache,packetSink h)
971 MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk
972
973 liftIO $ cacheCmd retry cached
974 liftIO $ putStrLn $ "retrying " ++ show retry
975 sendmsgs
976 932
977 933
978greetPeer = 934greetPeer =
@@ -1139,6 +1095,7 @@ connect' addr port = do
1139 ) 1095 )
1140 1096
1141 1097
1098{-
1142sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () 1099sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO ()
1143sendMessage cons msg peer0 = do 1100sendMessage cons msg peer0 = do
1144 let peer = discardPort peer0 1101 let peer = discardPort peer0
@@ -1170,11 +1127,12 @@ sendMessage cons msg peer0 = do
1170 atomically $ writeTChan (fst entry) msg 1127 atomically $ writeTChan (fst entry) msg
1171 when is_new . atomically $ 1128 when is_new . atomically $
1172 readTVar cons >>= writeTVar cons . Map.insert peer entry 1129 readTVar cons >>= writeTVar cons . Map.insert peer entry
1130-}
1173 1131
1174 1132
1175 1133
1176seekRemotePeers :: JabberPeerSession config => 1134seekRemotePeers :: JabberPeerSession config =>
1177 XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0 1135 XMPPPeerClass config -> TChan Presence -> OutgoingConnections CachedMessages -> IO b0
1178seekRemotePeers config chan server_connections = do 1136seekRemotePeers config chan server_connections = do
1179 fix $ \loop -> do 1137 fix $ \loop -> do
1180 event <- atomically $ readTChan chan 1138 event <- atomically $ readTChan chan