summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2013-07-14 02:12:52 -0400
committerjoe <joe@jerkface.net>2013-07-14 02:12:52 -0400
commitfa1b497fe08a85b79061ec663e78a869fcad3003 (patch)
tree018c013a0e6620c0ac7a0ce4aba05d583989f27b
parent9c52669ffde25a15066df74666fd2d8eb0bf0438 (diff)
experimental module SendMessage
-rw-r--r--SendMessage.hs208
1 files changed, 208 insertions, 0 deletions
diff --git a/SendMessage.hs b/SendMessage.hs
new file mode 100644
index 00000000..c6eec01e
--- /dev/null
+++ b/SendMessage.hs
@@ -0,0 +1,208 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-}
3module SendMessage where
4
5import Control.Monad
6import Control.Monad.Fix
7import Control.Monad.IO.Class
8import Control.Monad.Trans.Class
9import Control.Monad.Trans.Maybe
10import Control.Concurrent.STM
11import Control.Concurrent (forkIO)
12import Control.Concurrent.Async (async,waitSTM)
13import Control.Exception as E
14 ( bracketOnError
15 , finally
16 )
17import GHC.Conc
18 ( threadStatus
19 , ThreadStatus(..)
20 , ThreadId
21 )
22import Data.IORef
23import qualified Data.Map as Map
24import Data.Map as Map (Map)
25import qualified Data.Set as Set
26import Data.Set as Set (Set,(\\))
27import Data.XML.Types as XML (Event)
28import System.IO
29 ( BufferMode(..)
30 , IOMode(..)
31 , hSetBuffering
32 )
33import Network.BSD
34 ( getProtocolNumber
35 )
36import Network.Socket
37 ( Family
38 , connect
39 , socketToHandle
40 , sClose
41 , Socket(..)
42 , socket
43 , SocketType(..)
44 )
45
46import Todo
47import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
48import SocketLike
49import ServerC (packetSink)
50import ControlMaybe (handleIO,handleIO_)
51import Data.Conduit (Sink,Source)
52import qualified Data.ByteString as S (ByteString)
53import XMLToByteStrings
54
55type ByteStringSink = Data.Conduit.Sink S.ByteString IO ()
56
57type OutgoingToPeer sock cache msg =
58 sock
59 -> cache
60 -> TChan msg
61 -> ByteStringSink
62 -> IO (Maybe msg)
63
64class CacheableCommand cmd where
65 type CacheRef cmd :: *
66 type Cache cmd :: *
67
68 emptyCache :: CacheRef cmd
69
70 cache :: cmd -> CacheRef cmd -> IO ()
71
72 interpretCommands :: SocketLike sock =>
73 sock -> Cache cmd -> TChan cmd -> ByteStringSink -> IO (Maybe cmd)
74
75
76sendMessage ::
77 (cache, msg -> IORef cache -> IO (), OutBoundXML RestrictedSocket cache msg) -> TVar (Map Peer (TChan msg, ThreadId)) -> msg -> Peer -> IO ()
78sendMessage (newCache,cacheCmd,toPeer) cons msg peer0 = do
79 let peer = discardPort peer0
80 found <- atomically $ do
81 consmap <- readTVar cons
82 return (Map.lookup peer consmap)
83 let newEntry = do
84 chan <- atomically newTChan
85 t <- forkIO $ connect_to_server chan peer (newCache,cacheCmd,toPeer)
86 -- L.putStrLn $ "remote-map new: " <++> showPeer peer
87 return (True,(chan,t))
88 (is_new,entry) <- maybe newEntry
89 ( \(chan,t) -> do
90 st <- threadStatus t
91 let running = do
92 -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
93 return (False,(chan,t))
94 died = do
95 -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
96 newEntry
97 case st of
98 ThreadRunning -> running
99 ThreadBlocked _ -> running
100 ThreadDied -> died
101 ThreadFinished -> died
102 )
103 found
104 -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
105 atomically $ writeTChan (fst entry) msg
106 when is_new . atomically $
107 readTVar cons >>= writeTVar cons . Map.insert peer entry
108
109
110connect_to_server chan peer (newCache,cacheCmd,toPeer) = (>> return ()) . runMaybeT $ do
111 let port = 5269 :: Int
112 -- We'll cache Presence notifications until the socket
113 -- is ready.
114 cached <- liftIO $ newIORef newCache
115
116 fix $ \sendmsgs -> do
117 connected <- liftIO . async $ connect' (peerAddr peer) port
118
119 sock <- MaybeT . fix $ \loop -> do
120 e <- atomically $ orElse
121 (fmap Right $ waitSTM connected)
122 (fmap Left $ readTChan chan)
123 case e of
124 Left cmd -> cacheCmd cmd cached >> loop
125 Right sock -> return sock
126
127 retry <- do
128 (cache,snk) <- liftIO $ do
129 h <- socketToHandle sock ReadWriteMode
130 hSetBuffering h NoBuffering
131 cache <- readIORef $ cached
132 -- hint garbage collector: we're done with this...
133 writeIORef cached newCache
134 return (cache,packetSink h)
135 MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk
136
137 liftIO $ cacheCmd retry cached
138 -- liftIO $ putStrLn $ "retrying " ++ show retry
139 sendmsgs
140
141connect' :: SockAddr -> Int -> IO (Maybe Socket)
142connect' addr port = do
143 proto <- getProtocolNumber "tcp"
144 {-
145 -- Given (host :: HostName) ...
146 let hints = defaultHints { addrFlags = [AI_ADDRCONFIG]
147 , addrProtocol = proto
148 , addrSocketType = Stream }
149 addrs <- getAddrInfo (Just hints) (Just host) (Just serv)
150 firstSuccessful $ map tryToConnect addrs
151 -}
152 let getport (SockAddrInet port _) = port
153 getport (SockAddrInet6 port _ _ _) = port
154 let doException e = do
155 -- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
156 return Nothing
157 handleIO doException
158 $ tryToConnect proto (addr `withPort` port)
159 where
160 tryToConnect proto addr =
161 bracketOnError
162 (socket (socketFamily addr) Stream proto)
163 (sClose ) -- only done if there's an error
164 (\sock -> do
165 connect sock addr
166 return (Just sock) -- socketToHandle sock ReadWriteMode
167 )
168
169
170{-
171mmInsert val key mm = Map.alter f key mm
172 where
173 f Nothing = Just $ Set.singleton val
174 f (Just set) = Just $ Set.insert val set
175-}
176
177-- newCache = todo
178cacheCmd :: msg -> cache -> IO ()
179cacheCmd _ cached = todo
180-- toPeer = todo
181
182type OutBoundXML sock cache msg =
183 sock
184 -> cache
185 -> TChan msg
186 -> (Maybe msg -> IO ())
187 -> Source IO [XML.Event]
188
189handleOutgoingToPeer
190 :: SocketLike sock =>
191 OutBoundXML sock cache msg
192 -> sock
193 -> cache
194 -> TChan msg
195 -> ByteStringSink
196 -> IO (Maybe msg)
197handleOutgoingToPeer toPeer sock cache chan snk = do
198 p <- getPeerName sock
199 -- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
200 failed <- newIORef Nothing
201 let failure cmd = do
202 writeIORef failed cmd
203 -- putStrLn $ "Failed: " ++ show cmd
204 finally (
205 handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
206 ) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
207 readIORef failed
208