diff options
Diffstat (limited to 'SendMessage.hs')
-rw-r--r-- | SendMessage.hs | 86 |
1 files changed, 44 insertions, 42 deletions
diff --git a/SendMessage.hs b/SendMessage.hs index ff779e11..6e8ea2b9 100644 --- a/SendMessage.hs +++ b/SendMessage.hs | |||
@@ -1,11 +1,15 @@ | |||
1 | {-# LANGUAGE CPP #-} | 1 | {-# LANGUAGE CPP #-} |
2 | {-# LANGUAGE TypeFamilies #-} | 2 | {-# LANGUAGE TypeFamilies #-} |
3 | module SendMessage where | 3 | module SendMessage |
4 | ( sendMessage | ||
5 | , CommandCache(..) | ||
6 | , newOutgoingConnections | ||
7 | , OutgoingConnections | ||
8 | ) where | ||
4 | 9 | ||
5 | import Control.Monad | 10 | import Control.Monad |
6 | import Control.Monad.Fix | 11 | import Control.Monad.Fix |
7 | import Control.Monad.IO.Class | 12 | import Control.Monad.IO.Class |
8 | import Control.Monad.Trans.Class | ||
9 | import Control.Monad.Trans.Maybe | 13 | import Control.Monad.Trans.Maybe |
10 | import Control.Concurrent.STM | 14 | import Control.Concurrent.STM |
11 | import Control.Concurrent (forkIO) | 15 | import Control.Concurrent (forkIO) |
@@ -22,8 +26,6 @@ import GHC.Conc | |||
22 | import Data.IORef | 26 | import Data.IORef |
23 | import qualified Data.Map as Map | 27 | import qualified Data.Map as Map |
24 | import Data.Map as Map (Map) | 28 | import Data.Map as Map (Map) |
25 | import qualified Data.Set as Set | ||
26 | import Data.Set as Set (Set,(\\)) | ||
27 | import Data.XML.Types as XML (Event) | 29 | import Data.XML.Types as XML (Event) |
28 | import System.IO | 30 | import System.IO |
29 | ( BufferMode(..) | 31 | ( BufferMode(..) |
@@ -34,8 +36,7 @@ import Network.BSD | |||
34 | ( getProtocolNumber | 36 | ( getProtocolNumber |
35 | ) | 37 | ) |
36 | import Network.Socket | 38 | import Network.Socket |
37 | ( Family | 39 | ( connect |
38 | , connect | ||
39 | , socketToHandle | 40 | , socketToHandle |
40 | , sClose | 41 | , sClose |
41 | , Socket(..) | 42 | , Socket(..) |
@@ -43,7 +44,6 @@ import Network.Socket | |||
43 | , SocketType(..) | 44 | , SocketType(..) |
44 | ) | 45 | ) |
45 | 46 | ||
46 | import Todo | ||
47 | import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) | 47 | import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) |
48 | import SocketLike | 48 | import SocketLike |
49 | import ServerC (packetSink) | 49 | import ServerC (packetSink) |
@@ -52,14 +52,7 @@ import Data.Conduit (Sink,Source) | |||
52 | import qualified Data.ByteString as S (ByteString) | 52 | import qualified Data.ByteString as S (ByteString) |
53 | import XMLToByteStrings | 53 | import XMLToByteStrings |
54 | 54 | ||
55 | type ByteStringSink = Data.Conduit.Sink S.ByteString IO () | 55 | type ByteStringSink = Sink S.ByteString IO () |
56 | |||
57 | type OutgoingToPeer sock cache msg = | ||
58 | sock | ||
59 | -> cache | ||
60 | -> TChan msg | ||
61 | -> ByteStringSink | ||
62 | -> IO (Maybe msg) | ||
63 | 56 | ||
64 | -- |Strict version of 'modifyIORef' | 57 | -- |Strict version of 'modifyIORef' |
65 | modifyIORef' :: IORef a -> (a -> a) -> IO () | 58 | modifyIORef' :: IORef a -> (a -> a) -> IO () |
@@ -69,16 +62,45 @@ modifyIORef' ref f = do | |||
69 | x' `seq` writeIORef ref x' | 62 | x' `seq` writeIORef ref x' |
70 | 63 | ||
71 | 64 | ||
72 | sendMessage :: | 65 | class CommandCache cache where |
73 | (cache, msg -> cache -> cache, OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO () | 66 | type CacheableCommand cache |
74 | sendMessage (newCache,updateCache,toPeer) cons msg peer0 = do | 67 | |
68 | emptyCache :: cache | ||
69 | |||
70 | updateCache :: CacheableCommand cache -> cache -> cache | ||
71 | |||
72 | |||
73 | data OutgoingConnections cache = | ||
74 | OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId))) | ||
75 | (OutBoundXML RestrictedSocket cache (CacheableCommand cache)) | ||
76 | |||
77 | |||
78 | newOutgoingConnections :: | ||
79 | ( RestrictedSocket | ||
80 | -> cache | ||
81 | -> TChan (CacheableCommand cache) | ||
82 | -> (Maybe (CacheableCommand cache) -> IO ()) | ||
83 | -> Source IO [XML.Event] ) | ||
84 | |||
85 | -> STM (OutgoingConnections cache) | ||
86 | |||
87 | newOutgoingConnections interpretCommands = do | ||
88 | remotes <- newTVar (Map.empty) | ||
89 | return (OutgoingConnections remotes interpretCommands) | ||
90 | |||
91 | |||
92 | |||
93 | sendMessage | ||
94 | :: CommandCache a => | ||
95 | OutgoingConnections a -> CacheableCommand a -> Peer -> IO () | ||
96 | sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do | ||
75 | let peer = discardPort peer0 | 97 | let peer = discardPort peer0 |
76 | found <- atomically $ do | 98 | found <- atomically $ do |
77 | consmap <- readTVar cons | 99 | consmap <- readTVar cons |
78 | return (Map.lookup peer consmap) | 100 | return (Map.lookup peer consmap) |
79 | let newEntry = do | 101 | let newEntry = do |
80 | chan <- atomically newTChan | 102 | chan <- atomically newTChan |
81 | t <- forkIO $ connect_to_server chan peer (newCache,updateCache,toPeer) | 103 | t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer) |
82 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | 104 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer |
83 | return (True,(chan,t)) | 105 | return (True,(chan,t)) |
84 | (is_new,entry) <- maybe newEntry | 106 | (is_new,entry) <- maybe newEntry |
@@ -103,11 +125,11 @@ sendMessage (newCache,updateCache,toPeer) cons msg peer0 = do | |||
103 | readTVar cons >>= writeTVar cons . Map.insert peer entry | 125 | readTVar cons >>= writeTVar cons . Map.insert peer entry |
104 | 126 | ||
105 | 127 | ||
106 | connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . runMaybeT $ do | 128 | connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do |
107 | let port = 5269 :: Int | 129 | let port = 5269 :: Int |
108 | -- We'll cache Presence notifications until the socket | 130 | -- We'll cache Presence notifications until the socket |
109 | -- is ready. | 131 | -- is ready. |
110 | cached <- liftIO $ newIORef newCache | 132 | cached <- liftIO $ newIORef emptyCache |
111 | 133 | ||
112 | let cacheCmd msg cached = modifyIORef' cached (updateCache msg) | 134 | let cacheCmd msg cached = modifyIORef' cached (updateCache msg) |
113 | 135 | ||
@@ -128,7 +150,7 @@ connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . run | |||
128 | hSetBuffering h NoBuffering | 150 | hSetBuffering h NoBuffering |
129 | cache <- readIORef $ cached | 151 | cache <- readIORef $ cached |
130 | -- hint garbage collector: we're done with this... | 152 | -- hint garbage collector: we're done with this... |
131 | writeIORef cached newCache | 153 | writeIORef cached emptyCache |
132 | return (cache,packetSink h) | 154 | return (cache,packetSink h) |
133 | MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk | 155 | MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk |
134 | 156 | ||
@@ -139,14 +161,6 @@ connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . run | |||
139 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | 161 | connect' :: SockAddr -> Int -> IO (Maybe Socket) |
140 | connect' addr port = do | 162 | connect' addr port = do |
141 | proto <- getProtocolNumber "tcp" | 163 | proto <- getProtocolNumber "tcp" |
142 | {- | ||
143 | -- Given (host :: HostName) ... | ||
144 | let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] | ||
145 | , addrProtocol = proto | ||
146 | , addrSocketType = Stream } | ||
147 | addrs <- getAddrInfo (Just hints) (Just host) (Just serv) | ||
148 | firstSuccessful $ map tryToConnect addrs | ||
149 | -} | ||
150 | let getport (SockAddrInet port _) = port | 164 | let getport (SockAddrInet port _) = port |
151 | getport (SockAddrInet6 port _ _ _) = port | 165 | getport (SockAddrInet6 port _ _ _) = port |
152 | let doException e = do | 166 | let doException e = do |
@@ -165,18 +179,6 @@ connect' addr port = do | |||
165 | ) | 179 | ) |
166 | 180 | ||
167 | 181 | ||
168 | {- | ||
169 | mmInsert val key mm = Map.alter f key mm | ||
170 | where | ||
171 | f Nothing = Just $ Set.singleton val | ||
172 | f (Just set) = Just $ Set.insert val set | ||
173 | -} | ||
174 | |||
175 | -- newCache = todo | ||
176 | -- cacheCmd :: msg -> cache -> IO () | ||
177 | -- cacheCmd _ cached = todo | ||
178 | -- toPeer = todo | ||
179 | |||
180 | type OutBoundXML sock cache msg = | 182 | type OutBoundXML sock cache msg = |
181 | sock | 183 | sock |
182 | -> cache | 184 | -> cache |