summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--SendMessage.hs86
1 files changed, 44 insertions, 42 deletions
diff --git a/SendMessage.hs b/SendMessage.hs
index ff779e11..6e8ea2b9 100644
--- a/SendMessage.hs
+++ b/SendMessage.hs
@@ -1,11 +1,15 @@
1{-# LANGUAGE CPP #-} 1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-} 2{-# LANGUAGE TypeFamilies #-}
3module SendMessage where 3module SendMessage
4 ( sendMessage
5 , CommandCache(..)
6 , newOutgoingConnections
7 , OutgoingConnections
8 ) where
4 9
5import Control.Monad 10import Control.Monad
6import Control.Monad.Fix 11import Control.Monad.Fix
7import Control.Monad.IO.Class 12import Control.Monad.IO.Class
8import Control.Monad.Trans.Class
9import Control.Monad.Trans.Maybe 13import Control.Monad.Trans.Maybe
10import Control.Concurrent.STM 14import Control.Concurrent.STM
11import Control.Concurrent (forkIO) 15import Control.Concurrent (forkIO)
@@ -22,8 +26,6 @@ import GHC.Conc
22import Data.IORef 26import Data.IORef
23import qualified Data.Map as Map 27import qualified Data.Map as Map
24import Data.Map as Map (Map) 28import Data.Map as Map (Map)
25import qualified Data.Set as Set
26import Data.Set as Set (Set,(\\))
27import Data.XML.Types as XML (Event) 29import Data.XML.Types as XML (Event)
28import System.IO 30import System.IO
29 ( BufferMode(..) 31 ( BufferMode(..)
@@ -34,8 +36,7 @@ import Network.BSD
34 ( getProtocolNumber 36 ( getProtocolNumber
35 ) 37 )
36import Network.Socket 38import Network.Socket
37 ( Family 39 ( connect
38 , connect
39 , socketToHandle 40 , socketToHandle
40 , sClose 41 , sClose
41 , Socket(..) 42 , Socket(..)
@@ -43,7 +44,6 @@ import Network.Socket
43 , SocketType(..) 44 , SocketType(..)
44 ) 45 )
45 46
46import Todo
47import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily) 47import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
48import SocketLike 48import SocketLike
49import ServerC (packetSink) 49import ServerC (packetSink)
@@ -52,14 +52,7 @@ import Data.Conduit (Sink,Source)
52import qualified Data.ByteString as S (ByteString) 52import qualified Data.ByteString as S (ByteString)
53import XMLToByteStrings 53import XMLToByteStrings
54 54
55type ByteStringSink = Data.Conduit.Sink S.ByteString IO () 55type ByteStringSink = Sink S.ByteString IO ()
56
57type OutgoingToPeer sock cache msg =
58 sock
59 -> cache
60 -> TChan msg
61 -> ByteStringSink
62 -> IO (Maybe msg)
63 56
64-- |Strict version of 'modifyIORef' 57-- |Strict version of 'modifyIORef'
65modifyIORef' :: IORef a -> (a -> a) -> IO () 58modifyIORef' :: IORef a -> (a -> a) -> IO ()
@@ -69,16 +62,45 @@ modifyIORef' ref f = do
69 x' `seq` writeIORef ref x' 62 x' `seq` writeIORef ref x'
70 63
71 64
72sendMessage :: 65class CommandCache cache where
73 (cache, msg -> cache -> cache, OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO () 66 type CacheableCommand cache
74sendMessage (newCache,updateCache,toPeer) cons msg peer0 = do 67
68 emptyCache :: cache
69
70 updateCache :: CacheableCommand cache -> cache -> cache
71
72
73data OutgoingConnections cache =
74 OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId)))
75 (OutBoundXML RestrictedSocket cache (CacheableCommand cache))
76
77
78newOutgoingConnections ::
79 ( RestrictedSocket
80 -> cache
81 -> TChan (CacheableCommand cache)
82 -> (Maybe (CacheableCommand cache) -> IO ())
83 -> Source IO [XML.Event] )
84
85 -> STM (OutgoingConnections cache)
86
87newOutgoingConnections interpretCommands = do
88 remotes <- newTVar (Map.empty)
89 return (OutgoingConnections remotes interpretCommands)
90
91
92
93sendMessage
94 :: CommandCache a =>
95 OutgoingConnections a -> CacheableCommand a -> Peer -> IO ()
96sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do
75 let peer = discardPort peer0 97 let peer = discardPort peer0
76 found <- atomically $ do 98 found <- atomically $ do
77 consmap <- readTVar cons 99 consmap <- readTVar cons
78 return (Map.lookup peer consmap) 100 return (Map.lookup peer consmap)
79 let newEntry = do 101 let newEntry = do
80 chan <- atomically newTChan 102 chan <- atomically newTChan
81 t <- forkIO $ connect_to_server chan peer (newCache,updateCache,toPeer) 103 t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer)
82 -- L.putStrLn $ "remote-map new: " <++> showPeer peer 104 -- L.putStrLn $ "remote-map new: " <++> showPeer peer
83 return (True,(chan,t)) 105 return (True,(chan,t))
84 (is_new,entry) <- maybe newEntry 106 (is_new,entry) <- maybe newEntry
@@ -103,11 +125,11 @@ sendMessage (newCache,updateCache,toPeer) cons msg peer0 = do
103 readTVar cons >>= writeTVar cons . Map.insert peer entry 125 readTVar cons >>= writeTVar cons . Map.insert peer entry
104 126
105 127
106connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . runMaybeT $ do 128connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do
107 let port = 5269 :: Int 129 let port = 5269 :: Int
108 -- We'll cache Presence notifications until the socket 130 -- We'll cache Presence notifications until the socket
109 -- is ready. 131 -- is ready.
110 cached <- liftIO $ newIORef newCache 132 cached <- liftIO $ newIORef emptyCache
111 133
112 let cacheCmd msg cached = modifyIORef' cached (updateCache msg) 134 let cacheCmd msg cached = modifyIORef' cached (updateCache msg)
113 135
@@ -128,7 +150,7 @@ connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . run
128 hSetBuffering h NoBuffering 150 hSetBuffering h NoBuffering
129 cache <- readIORef $ cached 151 cache <- readIORef $ cached
130 -- hint garbage collector: we're done with this... 152 -- hint garbage collector: we're done with this...
131 writeIORef cached newCache 153 writeIORef cached emptyCache
132 return (cache,packetSink h) 154 return (cache,packetSink h)
133 MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk 155 MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk
134 156
@@ -139,14 +161,6 @@ connect_to_server chan peer (newCache,updateCache,toPeer) = (>> return ()) . run
139connect' :: SockAddr -> Int -> IO (Maybe Socket) 161connect' :: SockAddr -> Int -> IO (Maybe Socket)
140connect' addr port = do 162connect' addr port = do
141 proto <- getProtocolNumber "tcp" 163 proto <- getProtocolNumber "tcp"
142 {-
143 -- Given (host :: HostName) ...
144 let hints = defaultHints { addrFlags = [AI_ADDRCONFIG]
145 , addrProtocol = proto
146 , addrSocketType = Stream }
147 addrs <- getAddrInfo (Just hints) (Just host) (Just serv)
148 firstSuccessful $ map tryToConnect addrs
149 -}
150 let getport (SockAddrInet port _) = port 164 let getport (SockAddrInet port _) = port
151 getport (SockAddrInet6 port _ _ _) = port 165 getport (SockAddrInet6 port _ _ _) = port
152 let doException e = do 166 let doException e = do
@@ -165,18 +179,6 @@ connect' addr port = do
165 ) 179 )
166 180
167 181
168{-
169mmInsert val key mm = Map.alter f key mm
170 where
171 f Nothing = Just $ Set.singleton val
172 f (Just set) = Just $ Set.insert val set
173-}
174
175-- newCache = todo
176-- cacheCmd :: msg -> cache -> IO ()
177-- cacheCmd _ cached = todo
178-- toPeer = todo
179
180type OutBoundXML sock cache msg = 182type OutBoundXML sock cache msg =
181 sock 183 sock
182 -> cache 184 -> cache