1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
{-# LANGUAGE CPP #-}
{-# LANGUAGE TypeFamilies #-}
module SendMessage
( sendMessage
, CommandCache(..)
, newOutgoingConnections
, OutgoingConnections
) where
import Control.Monad
import Control.Monad.Fix
import Control.Monad.IO.Class
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (async,waitSTM)
import Control.Exception as E
( bracketOnError
, finally
)
import GHC.Conc
( threadStatus
, ThreadStatus(..)
, ThreadId
)
import Data.IORef
import qualified Data.Map as Map
import Data.Map as Map (Map)
import Data.XML.Types as XML (Event)
import System.IO
( BufferMode(..)
, IOMode(..)
, hSetBuffering
)
import Network.BSD
( getProtocolNumber
)
import Network.Socket
( connect
, socketToHandle
, sClose
, Socket(..)
, socket
, SocketType(..)
)
import XMPPTypes (Peer(..),discardPort,peerAddr,withPort,socketFamily)
import SocketLike
import ServerC (packetSink)
import ControlMaybe (handleIO,handleIO_)
import Data.Conduit (Sink,Source)
import qualified Data.ByteString as S (ByteString)
import XMLToByteStrings
type ByteStringSink = Sink S.ByteString IO ()
-- |Strict version of 'modifyIORef'
modifyIORef' :: IORef a -> (a -> a) -> IO ()
modifyIORef' ref f = do
x <- readIORef ref
let x' = f x
x' `seq` writeIORef ref x'
class CommandCache cache where
type CacheableCommand cache
emptyCache :: cache
updateCache :: CacheableCommand cache -> cache -> cache
data OutgoingConnections cache =
OutgoingConnections (TVar (Map Peer (TChan (CacheableCommand cache), ThreadId)))
(OutBoundXML RestrictedSocket cache (CacheableCommand cache))
newOutgoingConnections ::
( RestrictedSocket
-> cache
-> TChan (CacheableCommand cache)
-> (Maybe (CacheableCommand cache) -> IO ())
-> Source IO [XML.Event] )
-> STM (OutgoingConnections cache)
newOutgoingConnections interpretCommands = do
remotes <- newTVar (Map.empty)
return (OutgoingConnections remotes interpretCommands)
sendMessage
:: CommandCache a =>
OutgoingConnections a -> CacheableCommand a -> Peer -> IO ()
sendMessage (OutgoingConnections cons interpretCommands) msg peer0 = do
let peer = discardPort peer0
found <- atomically $ do
consmap <- readTVar cons
return (Map.lookup peer consmap)
let newEntry = do
chan <- atomically newTChan
t <- forkIO $ connect_to_server chan peer interpretCommands -- (newCache,updateCache,toPeer)
-- L.putStrLn $ "remote-map new: " <++> showPeer peer
return (True,(chan,t))
(is_new,entry) <- maybe newEntry
( \(chan,t) -> do
st <- threadStatus t
let running = do
-- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
return (False,(chan,t))
died = do
-- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
newEntry
case st of
ThreadRunning -> running
ThreadBlocked _ -> running
ThreadDied -> died
ThreadFinished -> died
)
found
-- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
atomically $ writeTChan (fst entry) msg
when is_new . atomically $
readTVar cons >>= writeTVar cons . Map.insert peer entry
connect_to_server chan peer toPeer = (>> return ()) . runMaybeT $ do
let port = 5269 :: Int
-- We'll cache Presence notifications until the socket
-- is ready.
cached <- liftIO $ newIORef emptyCache
let cacheCmd msg cached = modifyIORef' cached (updateCache msg)
fix $ \sendmsgs -> do
connected <- liftIO . async $ connect' (peerAddr peer) port
sock <- MaybeT . fix $ \loop -> do
e <- atomically $ orElse
(fmap Right $ waitSTM connected)
(fmap Left $ readTChan chan)
case e of
Left cmd -> cacheCmd cmd cached >> loop
Right sock -> return sock
retry <- do
(cache,snk) <- liftIO $ do
h <- socketToHandle sock ReadWriteMode
hSetBuffering h NoBuffering
cache <- readIORef $ cached
-- hint garbage collector: we're done with this...
writeIORef cached emptyCache
return (cache,packetSink h)
MaybeT $ handleOutgoingToPeer toPeer (restrictSocket sock) cache chan snk
liftIO $ cacheCmd retry cached
-- liftIO $ putStrLn $ "retrying " ++ show retry
sendmsgs
connect' :: SockAddr -> Int -> IO (Maybe Socket)
connect' addr port = do
proto <- getProtocolNumber "tcp"
let getport (SockAddrInet port _) = port
getport (SockAddrInet6 port _ _ _) = port
let doException e = do
-- L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
return Nothing
handleIO doException
$ tryToConnect proto (addr `withPort` port)
where
tryToConnect proto addr =
bracketOnError
(socket (socketFamily addr) Stream proto)
(sClose ) -- only done if there's an error
(\sock -> do
connect sock addr
return (Just sock) -- socketToHandle sock ReadWriteMode
)
type OutBoundXML sock cache msg =
sock
-> cache
-> TChan msg
-> (Maybe msg -> IO ())
-> Source IO [XML.Event]
handleOutgoingToPeer
:: SocketLike sock =>
OutBoundXML sock cache msg
-> sock
-> cache
-> TChan msg
-> ByteStringSink
-> IO (Maybe msg)
handleOutgoingToPeer toPeer sock cache chan snk = do
p <- getPeerName sock
-- L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
failed <- newIORef Nothing
let failure cmd = do
writeIORef failed cmd
-- putStrLn $ "Failed: " ++ show cmd
finally (
handleIO_ (return ()) $ toPeer sock cache chan failure `xmlToByteStrings` snk
) $ return () -- logging L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
readIORef failed
|